You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/20 15:55:19 UTC
[1/6] beam git commit: Make stage names consistent.
Repository: beam
Updated Branches:
refs/heads/master 4e0d5f596 -> 7019aa70d
Make stage names consistent.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209
Branch: refs/heads/master
Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba
Parents: 4e0d5f5
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:29:15 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:02 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 6 +++---
.../io/gcp/datastore/v1/datastoreio.py | 6 +++---
sdks/python/apache_beam/io/iobase.py | 2 +-
sdks/python/apache_beam/pipeline.py | 4 ++--
sdks/python/apache_beam/transforms/core.py | 19 ++++++++++---------
sdks/python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 4 ++--
7 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4686518..891f62a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side table. The runner
may use some caching techniques to share the side inputs between calls in order
to avoid excessive reading:::
- main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
- side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
+ main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
+ side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
- | 'process data' >> beam.Map(
+ | 'ProcessData' >> beam.Map(
lambda element, side_input: ..., AsList(side_table)))
There is no difference in how main and side inputs are read. What makes the
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index d9b3598..a0ccbbb 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform):
# outputs a ``PCollection[Entity]``.
queries = (pcoll.pipeline
- | 'User Query' >> Create([self._query])
- | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn(
+ | 'UserQuery' >> Create([self._query])
+ | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn(
self._project, self._query, self._datastore_namespace,
self._num_splits)))
sharded_queries = (queries
| GroupByKey()
| Values()
- | 'flatten' >> FlatMap(lambda x: x))
+ | 'Flatten' >> FlatMap(lambda x: x))
entities = sharded_queries | 'Read' >> ParDo(
ReadFromDatastore.ReadFn(self._project, self._datastore_namespace))
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index d9df5c4..2cac67f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform):
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
| 'Extract' >> core.FlatMap(lambda x: x[1]))
- return do_once | 'finalize_write' >> core.FlatMap(
+ return do_once | 'FinalizeWrite' >> core.FlatMap(
_finalize_write,
self.sink,
AsSingleton(init_result_coll),
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 2ff9eb3..8e811bc 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -32,11 +32,11 @@ Typical usage:
# Add to the pipeline a "Create" transform. When executed this
# transform will produce a PCollection object with the specified values.
- pcoll = p | 'create' >> beam.Create([1, 2, 3])
+ pcoll = p | 'Create' >> beam.Create([1, 2, 3])
# Another transform could be applied to pcoll, e.g., writing to a text file.
# For other transforms, refer to transforms/ directory.
- pcoll | 'write' >> beam.io.WriteToText('./output')
+ pcoll | 'Write' >> beam.io.WriteToText('./output')
# run() will execute the DAG stored in the pipeline. The execution of the
# nodes visited is done using the specified local runner.
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index bdfddbb..2d28eec 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1098,21 +1098,22 @@ class GroupByKey(PTransform):
# pylint: disable=bad-continuation
return (pcoll
- | 'reify_windows' >> (ParDo(self.ReifyWindows())
+ | 'ReifyWindows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
- | 'group_by_key' >> (GroupByKeyOnly()
+ | 'GroupByKey' >> (GroupByKeyOnly()
.with_input_types(reify_output_type)
.with_output_types(gbk_input_type))
- | ('group_by_window' >> ParDo(
+ | ('GroupByWindow' >> ParDo(
self.GroupAlsoByWindow(pcoll.windowing))
.with_input_types(gbk_input_type)
.with_output_types(gbk_output_type)))
- # If the input_type is None, run the default
- return (pcoll
- | 'reify_windows' >> ParDo(self.ReifyWindows())
- | 'group_by_key' >> GroupByKeyOnly()
- | 'group_by_window' >> ParDo(
- self.GroupAlsoByWindow(pcoll.windowing)))
+ else:
+ # The input_type is None, run the default
+ return (pcoll
+ | 'ReifyWindows' >> ParDo(self.ReifyWindows())
+ | 'GroupByKey' >> GroupByKeyOnly()
+ | 'GroupByWindow' >> ParDo(
+ self.GroupAlsoByWindow(pcoll.windowing)))
@typehints.with_input_types(typehints.KV[K, V])
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 9b7a37f..e2c4428 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -629,7 +629,7 @@ def ptransform_fn(fn):
With either method the custom PTransform can be used in pipelines as if
it were one of the "native" PTransforms::
- result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
+ result_pcoll = input_pcoll | 'Label' >> CustomMapper(somefn)
Note that for both solutions the underlying implementation of the pipe
operator (i.e., `|`) will inject the pcoll argument in its proper place
http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index b92af83..78277c2 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -959,8 +959,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# If this type-checks than no error should be raised.
d = (self.p
- | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
- | 'to_ints' >> beam.Map(bool_to_int))
+ | 'Bools' >> beam.Create([True, False, True]).with_output_types(bool)
+ | 'ToInts' >> beam.Map(bool_to_int))
assert_that(d, equal_to([1, 0, 1]))
self.p.run()
[2/6] beam git commit: Require deterministic window coders.
Posted by ro...@apache.org.
Require deterministic window coders.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2
Branch: refs/heads/master
Commit: d98294c2bd13b45522ea584485bd62e900144c88
Parents: 72f5020
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:49:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:03 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 16 ----------------
.../apache_beam/coders/coders_test_common.py | 1 -
sdks/python/apache_beam/transforms/core.py | 4 ++++
sdks/python/apache_beam/transforms/window.py | 18 +++++++++++++++++-
4 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8ef0a46..4f75182 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -688,22 +688,6 @@ class IterableCoder(FastCoder):
return hash((type(self), self._elem_coder))
-class WindowCoder(PickleCoder):
- """Coder for windows in windowed values."""
-
- def _create_impl(self):
- return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
- def is_deterministic(self):
- # Note that WindowCoder as implemented is not deterministic because the
- # implementation simply pickles windows. See the corresponding comments
- # on PickleCoder for more details.
- return False
-
- def as_cloud_object(self):
- return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
class GlobalWindowCoder(SingletonCoder):
"""Coder for global windows."""
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 6491ea8..da0bde3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase):
coders.FastCoder,
coders.ProtoCoder,
coders.ToStringCoder,
- coders.WindowCoder,
coders.IntervalWindowCoder])
assert not standard - cls.seen, standard - cls.seen
assert not standard - cls.seen_nested, standard - cls.seen_nested
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 2d28eec..9f66c39 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1185,6 +1185,10 @@ class Windowing(object):
else:
raise ValueError(
'accumulation_mode must be provided for non-trivial triggers')
+ if not windowfn.get_window_coder().is_deterministic():
+ raise ValueError(
+ 'window fn (%s) does not have a determanistic coder (%s)' % (
+ window_fn, windowfn.get_window_coder()))
self.windowfn = windowfn
self.triggerfn = triggerfn
self.accumulation_mode = accumulation_mode
http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 931a17d..643cb99 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,6 +49,8 @@ WindowFn.
from __future__ import absolute_import
+import abc
+
from google.protobuf import struct_pb2
from google.protobuf import wrappers_pb2
@@ -93,6 +95,8 @@ class OutputTimeFn(object):
class WindowFn(object):
"""An abstract windowing function defining a basic assign and merge."""
+ __metaclass__ = abc.ABCMeta
+
class AssignContext(object):
"""Context passed to WindowFn.assign()."""
@@ -100,6 +104,7 @@ class WindowFn(object):
self.timestamp = Timestamp.of(timestamp)
self.element = element
+ @abc.abstractmethod
def assign(self, assign_context):
"""Associates a timestamp to an element."""
raise NotImplementedError
@@ -113,6 +118,7 @@ class WindowFn(object):
def merge(self, to_be_merged, merge_result):
raise NotImplementedError
+ @abc.abstractmethod
def merge(self, merge_context):
"""Returns a window that is the result of merging a set of windows."""
raise NotImplementedError
@@ -121,8 +127,9 @@ class WindowFn(object):
"""Returns whether this WindowFn merges windows."""
return True
+ @abc.abstractmethod
def get_window_coder(self):
- return coders.WindowCoder()
+ raise NotImplementedError
def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument
"""Given input time and output window, returns output time for window.
@@ -344,6 +351,9 @@ class FixedWindows(NonMergingWindowFn):
start = timestamp - (timestamp - self.offset) % self.size
return [IntervalWindow(start, start + self.size)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def __eq__(self, other):
if type(self) == type(other) == FixedWindows:
return self.size == other.size and self.offset == other.offset
@@ -398,6 +408,9 @@ class SlidingWindows(NonMergingWindowFn):
for s in range(start.micros, timestamp.micros - self.size.micros,
-self.period.micros)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def __eq__(self, other):
if type(self) == type(other) == SlidingWindows:
return (self.size == other.size
@@ -443,6 +456,9 @@ class Sessions(WindowFn):
timestamp = context.timestamp
return [IntervalWindow(timestamp, timestamp + self.gap_size)]
+ def get_window_coder(self):
+ return coders.IntervalWindowCoder()
+
def merge(self, merge_context):
to_merge = []
end = timeutil.MIN_TIMESTAMP
[6/6] beam git commit: Closes #2552
Posted by ro...@apache.org.
Closes #2552
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7019aa70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7019aa70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7019aa70
Branch: refs/heads/master
Commit: 7019aa70db62575ae275ae93155bb0903a1ff26e
Parents: 4e0d5f5 2c34719
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Apr 20 08:55:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:06 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 16 ---------
.../apache_beam/coders/coders_test_common.py | 7 ++--
sdks/python/apache_beam/io/gcp/bigquery.py | 6 ++--
.../io/gcp/datastore/v1/datastoreio.py | 6 ++--
sdks/python/apache_beam/io/iobase.py | 2 +-
sdks/python/apache_beam/pipeline.py | 4 +--
sdks/python/apache_beam/runners/runner.py | 34 --------------------
sdks/python/apache_beam/transforms/core.py | 23 +++++++------
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 4 +--
sdks/python/apache_beam/transforms/trigger.py | 10 +++---
.../apache_beam/transforms/trigger_test.py | 10 +++---
sdks/python/apache_beam/transforms/window.py | 18 ++++++++++-
13 files changed, 58 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
[5/6] beam git commit: Rename AfterFirst to AfterAny for consistency
with Java.
Posted by ro...@apache.org.
Rename AfterFirst to AfterAny for consistency with Java.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c347192
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c347192
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c347192
Branch: refs/heads/master
Commit: 2c347192709e4076a1ac59b5ea769d4d3b2494f4
Parents: 4b9029a
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 11:00:04 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:05 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/trigger.py | 10 ++++++----
sdks/python/apache_beam/transforms/trigger_test.py | 10 +++++-----
2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6a4cf24..b9786f4 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -186,7 +186,7 @@ class TriggerFn(object):
def from_runner_api(proto, context):
return {
'after_all': AfterAll,
- 'after_any': AfterFirst,
+ 'after_any': AfterAny,
'after_each': AfterEach,
'after_end_of_window': AfterWatermark,
# after_processing_time, after_synchronized_processing_time
@@ -488,7 +488,8 @@ class ParallelTriggerFn(TriggerFn):
in proto.after_all.subtriggers or proto.after_any.subtriggers]
if proto.after_all.subtriggers:
return AfterAll(*subtriggers)
- return AfterFirst(*subtriggers)
+ else:
+ return AfterAny(*subtriggers)
def to_runner_api(self, context):
subtriggers = [
@@ -505,7 +506,7 @@ class ParallelTriggerFn(TriggerFn):
raise NotImplementedError(self)
-class AfterFirst(ParallelTriggerFn):
+class AfterAny(ParallelTriggerFn):
"""Fires when any subtrigger fires.
Also finishes when any subtrigger finishes.
@@ -589,7 +590,8 @@ class AfterEach(TriggerFn):
for subtrigger in self.triggers]))
-class OrFinally(AfterFirst):
+class OrFinally(AfterAny):
+
@staticmethod
def from_runner_api(proto, context):
return OrFinally(
http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 9f2046a..914babb 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -33,7 +33,7 @@ from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterAll
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AfterEach
-from apache_beam.transforms.trigger import AfterFirst
+from apache_beam.transforms.trigger import AfterAny
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.transforms.trigger import GeneralTriggerDriver
@@ -217,7 +217,7 @@ class TriggerTest(unittest.TestCase):
def test_fixed_after_first(self):
self.run_trigger_simple(
FixedWindows(10), # pyformat break
- AfterFirst(AfterCount(2), AfterWatermark()),
+ AfterAny(AfterCount(2), AfterWatermark()),
AccumulationMode.ACCUMULATING,
[(1, 'a'), (2, 'b'), (3, 'c')],
{IntervalWindow(0, 10): [set('ab')]},
@@ -225,7 +225,7 @@ class TriggerTest(unittest.TestCase):
2)
self.run_trigger_simple(
FixedWindows(10), # pyformat break
- AfterFirst(AfterCount(5), AfterWatermark()),
+ AfterAny(AfterCount(5), AfterWatermark()),
AccumulationMode.ACCUMULATING,
[(1, 'a'), (2, 'b'), (3, 'c')],
{IntervalWindow(0, 10): [set('abc')]},
@@ -236,7 +236,7 @@ class TriggerTest(unittest.TestCase):
def test_repeatedly_after_first(self):
self.run_trigger_simple(
FixedWindows(100), # pyformat break
- Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())),
+ Repeatedly(AfterAny(AfterCount(3), AfterWatermark())),
AccumulationMode.ACCUMULATING,
zip(range(7), 'abcdefg'),
{IntervalWindow(0, 100): [
@@ -388,7 +388,7 @@ class RunnerApiTest(unittest.TestCase):
for trigger_fn in (
DefaultTrigger(),
AfterAll(AfterCount(1), AfterCount(10)),
- AfterFirst(AfterCount(10), AfterCount(100)),
+ AfterAny(AfterCount(10), AfterCount(100)),
AfterWatermark(early=AfterCount(1000)),
AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
Repeatedly(AfterCount(100)),
[3/6] beam git commit: Remove obsolete and unused Runner.clear
Posted by ro...@apache.org.
Remove obsolete and unused Runner.clear
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b9029ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b9029ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b9029ac
Branch: refs/heads/master
Commit: 4b9029ac3e965ed3629833138597f1fb365b0876
Parents: 68c0042
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:57:35 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:04 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/runner.py | 34 --------------------------
1 file changed, 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4b9029ac/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index ccb066b..b35cb7b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -99,8 +99,6 @@ class PipelineRunner(object):
The base runner provides a run() method for visiting every node in the
pipeline's DAG and executing the transforms computing the PValue in the node.
- It also provides a clear() method for visiting every node and clearing out
- the values contained in PValue objects produced during a run.
A custom runner will typically provide implementations for some of the
transform methods (ParDo, GroupByKey, Create, etc.). It may also
@@ -129,38 +127,6 @@ class PipelineRunner(object):
pipeline.visit(RunVisitor(self))
- def clear(self, pipeline, node=None):
- """Clear all nodes or nodes reachable from node of materialized values.
-
- Args:
- pipeline: Pipeline object containing PValues to be cleared.
- node: Optional node in the Pipeline processing DAG. If specified only
- nodes reachable from this node will be cleared (ancestors of the node).
-
- This method is not intended (for now) to be called by users of Runner
- objects. It is a hook for future layers on top of the current programming
- model to control how much of the previously computed values are kept
- around. Presumably an interactivity layer will use it. The simplest way
- to change the behavior would be to define a runner that overwrites the
- clear_pvalue() method since this method (runner.clear) will visit all
- relevant nodes and call clear_pvalue on them.
-
- """
-
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.pipeline import PipelineVisitor
-
- class ClearVisitor(PipelineVisitor):
-
- def __init__(self, runner):
- self.runner = runner
-
- def visit_value(self, value, _):
- self.runner.clear_pvalue(value)
-
- pipeline.visit(ClearVisitor(self), node=node)
-
def apply(self, transform, input):
"""Runner callback for a pipeline.apply call.
[4/6] beam git commit: Enable IntervalWindowCoder test check.
Posted by ro...@apache.org.
Enable IntervalWindowCoder test check.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68c00426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68c00426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68c00426
Branch: refs/heads/master
Commit: 68c00426261c54a32be509f9ab53cf3543d49d78
Parents: d98294c
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:52:44 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:04 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders_test_common.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/68c00426/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index da0bde3..e5bfe35 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -61,8 +61,7 @@ class CodersTest(unittest.TestCase):
standard -= set([coders.Coder,
coders.FastCoder,
coders.ProtoCoder,
- coders.ToStringCoder,
- coders.IntervalWindowCoder])
+ coders.ToStringCoder])
assert not standard - cls.seen, standard - cls.seen
assert not standard - cls.seen_nested, standard - cls.seen_nested
@@ -171,6 +170,9 @@ class CodersTest(unittest.TestCase):
*[window.IntervalWindow(x, y)
for x in [-2**52, 0, 2**52]
for y in range(-100, 100)])
+ self.check_coder(
+ coders.TupleCoder((coders.IntervalWindowCoder(),)),
+ (window.IntervalWindow(0, 10),))
def test_timestamp_coder(self):
self.check_coder(coders.TimestampCoder(),