You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/18 20:35:57 UTC
[7/9] incubator-beam git commit: Optimize globally windowed side
input case
Optimize globally windowed side input case
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82f553e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82f553e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82f553e1
Branch: refs/heads/python-sdk
Commit: 82f553e1a777a8f88ca163e07d631d3df3e1f321
Parents: 989a189
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 13 18:17:30 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700
----------------------------------------------------------------------
.../examples/cookbook/bigquery_side_input.py | 1 -
sdks/python/apache_beam/pvalue.py | 4 ++--
sdks/python/apache_beam/runners/common.py | 16 ++++++++++++----
sdks/python/apache_beam/runners/direct_runner.py | 5 -----
.../inprocess/inprocess_evaluation_context.py | 5 -----
sdks/python/apache_beam/transforms/sideinputs.py | 9 ++++++---
.../apache_beam/transforms/sideinputs_test.py | 2 +-
7 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index ffba786..8a53637 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -33,7 +33,6 @@ from random import randrange
import apache_beam as beam
-from apache_beam.pvalue import AsIter
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.utils.options import PipelineOptions
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index f8b7feb..e40d746 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -276,8 +276,8 @@ class SingletonPCollectionView(PCollectionView):
return head[0]
else:
raise ValueError(
- 'PCollection with more than one element accessed as '
- 'a singleton view.')
+ 'PCollection with more than one element accessed as '
+ 'a singleton view.')
class IterablePCollectionView(PCollectionView):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index cd06879..e500fd8 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -76,17 +76,25 @@ class DoFnRunner(Receiver):
self.dofn = fn
self.dofn_process = fn.process
else:
+ global_window = window.GlobalWindow()
# TODO(robertwb): Remove when all runners pass side input maps.
- # TODO(robertwb): Optimize for global windows case.
- side_inputs = [side_input if isinstance(side_input, sideinputs.SideInputMap)
- else {window.GlobalWindow(): side_input}
+ side_inputs = [side_input
+ if isinstance(side_input, sideinputs.SideInputMap)
+ else {global_window: side_input}
for side_input in side_inputs]
+ if side_inputs and all(side_input.is_globally_windowed()
+ for side_input in side_inputs):
+ args, kwargs = util.insert_values_in_args(
+ args, kwargs, [side_input[global_window]
+ for side_input in side_inputs])
+ side_inputs = []
if side_inputs:
self.has_windowed_side_inputs = True
+
def process(context):
w = context.windows[0]
cur_args, cur_kwargs = util.insert_values_in_args(
- args, kwargs, [side_input[w] for side_input in side_inputs])
+ args, kwargs, [side_input[w] for side_input in side_inputs])
return fn.process(context, *cur_args, **cur_kwargs)
self.dofn_process = process
elif kwargs:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py
index 936043c..c4c52b3 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -30,11 +30,6 @@ import logging
from apache_beam import coders
from apache_beam import error
-from apache_beam.pvalue import DictPCollectionView
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import IterablePCollectionView
-from apache_beam.pvalue import ListPCollectionView
-from apache_beam.pvalue import SingletonPCollectionView
from apache_beam.runners.common import DoFnRunner
from apache_beam.runners.common import DoFnState
from apache_beam.runners.runner import PipelineResult
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index c6bd41f..7af1608 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -22,11 +22,6 @@ from __future__ import absolute_import
import collections
import threading
-from apache_beam.pvalue import DictPCollectionView
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import IterablePCollectionView
-from apache_beam.pvalue import ListPCollectionView
-from apache_beam.pvalue import SingletonPCollectionView
from apache_beam.transforms import sideinputs
from apache_beam.runners.inprocess.clock import Clock
from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 2079c93..182179a 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -170,7 +170,7 @@ def default_window_mapping_fn(target_window_fn):
else:
def map_via_end(source_window):
return list(target_window_fn.assign(
- window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
+ window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
return map_via_end
@@ -188,14 +188,17 @@ class SideInputMap(object):
if window not in self._cache:
target_window = self._window_mapping_fn(window)
self._cache[window] = self._view_class.from_iterable(
- _FilteringIterable(self._iterable, target_window), self._view_options)
+ _FilteringIterable(self._iterable, target_window), self._view_options)
return self._cache[window]
+ def is_globally_windowed(self):
+ return self._window_mapping_fn == _global_window_mapping_fn
+
class _FilteringIterable(object):
"""An iterable containing only those values in the given window.
"""
-
+
def __init__(self, iterable, target_window):
self._iterable = iterable
self._target_window = target_window
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index f84ff57..28e324a 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -101,7 +101,7 @@ class SideInputsTest(unittest.TestCase):
[1, 2, 11],
window.FixedWindows(10),
side_input_type=beam.pvalue.AsIter,
- expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
+ expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
def test_windowed_singleton(self):
self.run_windowed_side_inputs(