You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/14 23:47:35 UTC
[1/2] beam git commit: Choose GroupAlsoByWindows implementation based
on streaming flag
Repository: beam
Updated Branches:
refs/heads/master dd9abc397 -> c96208347
Choose GroupAlsoByWindows implementation based on streaming flag
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b600d20
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b600d20
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b600d20
Branch: refs/heads/master
Commit: 0b600d20de2cf2e6071d1d288d4b6a4795df710a
Parents: dd9abc3
Author: Charles Chen <cc...@google.com>
Authored: Wed Jun 7 16:09:10 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 14 16:47:20 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 9 --
.../apache_beam/runners/direct/direct_runner.py | 28 ++++++
sdks/python/apache_beam/transforms/core.py | 89 +++++++++++---------
3 files changed, 79 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index daef3a7..8598e05 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -18,7 +18,6 @@
"""Pipeline options obtained from command line parsing."""
import argparse
-import warnings
from apache_beam.transforms.display import HasDisplayData
from apache_beam.options.value_provider import StaticValueProvider
@@ -279,14 +278,6 @@ class StandardOptions(PipelineOptions):
action='store_true',
help='Whether to enable streaming mode.')
- # TODO(BEAM-1265): Remove this warning, once at least one runner supports
- # streaming pipelines.
- def validate(self, validator):
- errors = []
- if self.view_as(StandardOptions).streaming:
- warnings.warn('Streaming pipelines are not supported.')
- return errors
-
class TypeOptions(PipelineOptions):
http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 323f44b..d80ef10 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,19 +26,34 @@ from __future__ import absolute_import
import collections
import logging
+from apache_beam import typehints
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.runners.direct.bundle_factory import BundleFactory
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.core import _GroupAlsoByWindow
from apache_beam.options.pipeline_options import DirectOptions
+from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
__all__ = ['DirectRunner']
+# Type variables.
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
+
+@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
+ """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
+ pass
+
+
class DirectRunner(PipelineRunner):
"""Executes a single pipeline on the local machine."""
@@ -64,6 +79,19 @@ class DirectRunner(PipelineRunner):
except NotImplementedError:
return transform.expand(pcoll)
+ def apply__GroupAlsoByWindow(self, transform, pcoll):
+ if (transform.__class__ == _GroupAlsoByWindow and
+ pcoll.pipeline._options.view_as(StandardOptions).streaming):
+ # Use specialized streaming implementation, if requested.
+ raise NotImplementedError(
+ 'Streaming support is not yet available on the DirectRunner.')
+ # TODO(ccy): enable when streaming implementation is plumbed through.
+ # type_hints = transform.get_type_hints()
+ # return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
+ # .with_input_types(*type_hints.input_types[0])
+ # .with_output_types(*type_hints.output_types[0]))
+ return transform.expand(pcoll)
+
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index a137a13..c30136d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1078,40 +1078,6 @@ class GroupByKey(PTransform):
key_type, value_type = trivial_inference.key_value_types(input_type)
return Iterable[KV[key_type, typehints.WindowedValue[value_type]]]
- class GroupAlsoByWindow(DoFn):
- # TODO(robertwb): Support combiner lifting.
-
- def __init__(self, windowing):
- super(GroupByKey.GroupAlsoByWindow, self).__init__()
- self.windowing = windowing
-
- def infer_output_type(self, input_type):
- key_type, windowed_value_iter_type = trivial_inference.key_value_types(
- input_type)
- value_type = windowed_value_iter_type.inner_type.inner_type
- return Iterable[KV[key_type, Iterable[value_type]]]
-
- def start_bundle(self):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.transforms.trigger import InMemoryUnmergedState
- from apache_beam.transforms.trigger import create_trigger_driver
- # pylint: enable=wrong-import-order, wrong-import-position
- self.driver = create_trigger_driver(self.windowing, True)
- self.state_type = InMemoryUnmergedState
-
- def process(self, element):
- k, vs = element
- state = self.state_type()
- # TODO(robertwb): Conditionally process in smaller chunks.
- for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
- yield wvalue.with_value((k, wvalue.value))
- while state.timers:
- fired = state.get_and_clear_timers()
- for timer_window, (name, time_domain, fire_time) in fired:
- for wvalue in self.driver.process_timer(
- timer_window, name, time_domain, fire_time, state):
- yield wvalue.with_value((k, wvalue.value))
-
def expand(self, pcoll):
# This code path is only used in the local direct runner. For Dataflow
# runner execution, the GroupByKey transform is expanded on the service.
@@ -1136,8 +1102,7 @@ class GroupByKey(PTransform):
| 'GroupByKey' >> (_GroupByKeyOnly()
.with_input_types(reify_output_type)
.with_output_types(gbk_input_type))
- | ('GroupByWindow' >> ParDo(
- self.GroupAlsoByWindow(pcoll.windowing))
+ | ('GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing)
.with_input_types(gbk_input_type)
.with_output_types(gbk_output_type)))
else:
@@ -1145,8 +1110,7 @@ class GroupByKey(PTransform):
return (pcoll
| 'ReifyWindows' >> ParDo(self.ReifyWindows())
| 'GroupByKey' >> _GroupByKeyOnly()
- | 'GroupByWindow' >> ParDo(
- self.GroupAlsoByWindow(pcoll.windowing)))
+ | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
@typehints.with_input_types(typehints.KV[K, V])
@@ -1162,6 +1126,55 @@ class _GroupByKeyOnly(PTransform):
return pvalue.PCollection(pcoll.pipeline)
+@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class _GroupAlsoByWindow(ParDo):
+ """The GroupAlsoByWindow transform."""
+ def __init__(self, windowing):
+ super(_GroupAlsoByWindow, self).__init__(
+ _GroupAlsoByWindowDoFn(windowing))
+ self.windowing = windowing
+
+ def expand(self, pcoll):
+ self._check_pcollection(pcoll)
+ return pvalue.PCollection(pcoll.pipeline)
+
+
+class _GroupAlsoByWindowDoFn(DoFn):
+ # TODO(robertwb): Support combiner lifting.
+
+ def __init__(self, windowing):
+ super(_GroupAlsoByWindowDoFn, self).__init__()
+ self.windowing = windowing
+
+ def infer_output_type(self, input_type):
+ key_type, windowed_value_iter_type = trivial_inference.key_value_types(
+ input_type)
+ value_type = windowed_value_iter_type.inner_type.inner_type
+ return Iterable[KV[key_type, Iterable[value_type]]]
+
+ def start_bundle(self):
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.transforms.trigger import InMemoryUnmergedState
+ from apache_beam.transforms.trigger import create_trigger_driver
+ # pylint: enable=wrong-import-order, wrong-import-position
+ self.driver = create_trigger_driver(self.windowing, True)
+ self.state_type = InMemoryUnmergedState
+
+ def process(self, element):
+ k, vs = element
+ state = self.state_type()
+ # TODO(robertwb): Conditionally process in smaller chunks.
+ for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
+ yield wvalue.with_value((k, wvalue.value))
+ while state.timers:
+ fired = state.get_and_clear_timers()
+ for timer_window, (name, time_domain, fire_time) in fired:
+ for wvalue in self.driver.process_timer(
+ timer_window, name, time_domain, fire_time, state):
+ yield wvalue.with_value((k, wvalue.value))
+
+
class Partition(PTransformWithSideInputs):
"""Split a PCollection into several partitions.
[2/2] beam git commit: This closes #3318
Posted by al...@apache.org.
This closes #3318
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9620834
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9620834
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9620834
Branch: refs/heads/master
Commit: c96208347c662b3981ca2b622cad92637d4df3a2
Parents: dd9abc3 0b600d2
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jun 14 16:47:23 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jun 14 16:47:23 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 9 --
.../apache_beam/runners/direct/direct_runner.py | 28 ++++++
sdks/python/apache_beam/transforms/core.py | 89 +++++++++++---------
3 files changed, 79 insertions(+), 47 deletions(-)
----------------------------------------------------------------------