You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/14 23:47:35 UTC

[1/2] beam git commit: Choose GroupAlsoByWindows implementation based on streaming flag

Repository: beam
Updated Branches:
  refs/heads/master dd9abc397 -> c96208347


Choose GroupAlsoByWindows implementation based on streaming flag


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b600d20
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b600d20
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b600d20

Branch: refs/heads/master
Commit: 0b600d20de2cf2e6071d1d288d4b6a4795df710a
Parents: dd9abc3
Author: Charles Chen <cc...@google.com>
Authored: Wed Jun 7 16:09:10 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 14 16:47:20 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     |  9 --
 .../apache_beam/runners/direct/direct_runner.py | 28 ++++++
 sdks/python/apache_beam/transforms/core.py      | 89 +++++++++++---------
 3 files changed, 79 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index daef3a7..8598e05 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -18,7 +18,6 @@
 """Pipeline options obtained from command line parsing."""
 
 import argparse
-import warnings
 
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.options.value_provider import StaticValueProvider
@@ -279,14 +278,6 @@ class StandardOptions(PipelineOptions):
                         action='store_true',
                         help='Whether to enable streaming mode.')
 
-  # TODO(BEAM-1265): Remove this warning, once at least one runner supports
-  # streaming pipelines.
-  def validate(self, validator):
-    errors = []
-    if self.view_as(StandardOptions).streaming:
-      warnings.warn('Streaming pipelines are not supported.')
-    return errors
-
 
 class TypeOptions(PipelineOptions):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 323f44b..d80ef10 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,19 +26,34 @@ from __future__ import absolute_import
 import collections
 import logging
 
+from apache_beam import typehints
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.runners.direct.bundle_factory import BundleFactory
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.core import _GroupAlsoByWindow
 from apache_beam.options.pipeline_options import DirectOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
 
 
 __all__ = ['DirectRunner']
 
 
+# Type variables.
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
+
+@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
+  """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
+  pass
+
+
 class DirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 
@@ -64,6 +79,19 @@ class DirectRunner(PipelineRunner):
     except NotImplementedError:
       return transform.expand(pcoll)
 
+  def apply__GroupAlsoByWindow(self, transform, pcoll):
+    if (transform.__class__ == _GroupAlsoByWindow and
+        pcoll.pipeline._options.view_as(StandardOptions).streaming):
+      # Use specialized streaming implementation, if requested.
+      raise NotImplementedError(
+          'Streaming support is not yet available on the DirectRunner.')
+      # TODO(ccy): enable when streaming implementation is plumbed through.
+      # type_hints = transform.get_type_hints()
+      # return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
+      #                 .with_input_types(*type_hints.input_types[0])
+      #                 .with_output_types(*type_hints.output_types[0]))
+    return transform.expand(pcoll)
+
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index a137a13..c30136d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1078,40 +1078,6 @@ class GroupByKey(PTransform):
       key_type, value_type = trivial_inference.key_value_types(input_type)
       return Iterable[KV[key_type, typehints.WindowedValue[value_type]]]
 
-  class GroupAlsoByWindow(DoFn):
-    # TODO(robertwb): Support combiner lifting.
-
-    def __init__(self, windowing):
-      super(GroupByKey.GroupAlsoByWindow, self).__init__()
-      self.windowing = windowing
-
-    def infer_output_type(self, input_type):
-      key_type, windowed_value_iter_type = trivial_inference.key_value_types(
-          input_type)
-      value_type = windowed_value_iter_type.inner_type.inner_type
-      return Iterable[KV[key_type, Iterable[value_type]]]
-
-    def start_bundle(self):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.transforms.trigger import InMemoryUnmergedState
-      from apache_beam.transforms.trigger import create_trigger_driver
-      # pylint: enable=wrong-import-order, wrong-import-position
-      self.driver = create_trigger_driver(self.windowing, True)
-      self.state_type = InMemoryUnmergedState
-
-    def process(self, element):
-      k, vs = element
-      state = self.state_type()
-      # TODO(robertwb): Conditionally process in smaller chunks.
-      for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
-        yield wvalue.with_value((k, wvalue.value))
-      while state.timers:
-        fired = state.get_and_clear_timers()
-        for timer_window, (name, time_domain, fire_time) in fired:
-          for wvalue in self.driver.process_timer(
-              timer_window, name, time_domain, fire_time, state):
-            yield wvalue.with_value((k, wvalue.value))
-
   def expand(self, pcoll):
     # This code path is only used in the local direct runner.  For Dataflow
     # runner execution, the GroupByKey transform is expanded on the service.
@@ -1136,8 +1102,7 @@ class GroupByKey(PTransform):
               | 'GroupByKey' >> (_GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
-              | ('GroupByWindow' >> ParDo(
-                     self.GroupAlsoByWindow(pcoll.windowing))
+              | ('GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing)
                  .with_input_types(gbk_input_type)
                  .with_output_types(gbk_output_type)))
     else:
@@ -1145,8 +1110,7 @@ class GroupByKey(PTransform):
       return (pcoll
               | 'ReifyWindows' >> ParDo(self.ReifyWindows())
               | 'GroupByKey' >> _GroupByKeyOnly()
-              | 'GroupByWindow' >> ParDo(
-                    self.GroupAlsoByWindow(pcoll.windowing)))
+              | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
 
 
 @typehints.with_input_types(typehints.KV[K, V])
@@ -1162,6 +1126,55 @@ class _GroupByKeyOnly(PTransform):
     return pvalue.PCollection(pcoll.pipeline)
 
 
+@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _GroupAlsoByWindow(ParDo):
+  """The GroupAlsoByWindow transform."""
+  def __init__(self, windowing):
+    super(_GroupAlsoByWindow, self).__init__(
+        _GroupAlsoByWindowDoFn(windowing))
+    self.windowing = windowing
+
+  def expand(self, pcoll):
+    self._check_pcollection(pcoll)
+    return pvalue.PCollection(pcoll.pipeline)
+
+
+class _GroupAlsoByWindowDoFn(DoFn):
+  # TODO(robertwb): Support combiner lifting.
+
+  def __init__(self, windowing):
+    super(_GroupAlsoByWindowDoFn, self).__init__()
+    self.windowing = windowing
+
+  def infer_output_type(self, input_type):
+    key_type, windowed_value_iter_type = trivial_inference.key_value_types(
+        input_type)
+    value_type = windowed_value_iter_type.inner_type.inner_type
+    return Iterable[KV[key_type, Iterable[value_type]]]
+
+  def start_bundle(self):
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.transforms.trigger import InMemoryUnmergedState
+    from apache_beam.transforms.trigger import create_trigger_driver
+    # pylint: enable=wrong-import-order, wrong-import-position
+    self.driver = create_trigger_driver(self.windowing, True)
+    self.state_type = InMemoryUnmergedState
+
+  def process(self, element):
+    k, vs = element
+    state = self.state_type()
+    # TODO(robertwb): Conditionally process in smaller chunks.
+    for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
+      yield wvalue.with_value((k, wvalue.value))
+    while state.timers:
+      fired = state.get_and_clear_timers()
+      for timer_window, (name, time_domain, fire_time) in fired:
+        for wvalue in self.driver.process_timer(
+            timer_window, name, time_domain, fire_time, state):
+          yield wvalue.with_value((k, wvalue.value))
+
+
 class Partition(PTransformWithSideInputs):
   """Split a PCollection into several partitions.
 


[2/2] beam git commit: This closes #3318

Posted by al...@apache.org.
This closes #3318


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9620834
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9620834
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9620834

Branch: refs/heads/master
Commit: c96208347c662b3981ca2b622cad92637d4df3a2
Parents: dd9abc3 0b600d2
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jun 14 16:47:23 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 14 16:47:23 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     |  9 --
 .../apache_beam/runners/direct/direct_runner.py | 28 ++++++
 sdks/python/apache_beam/transforms/core.py      | 89 +++++++++++---------
 3 files changed, 79 insertions(+), 47 deletions(-)
----------------------------------------------------------------------