You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/20 15:55:19 UTC

[1/6] beam git commit: Make stage names consistent.

Repository: beam
Updated Branches:
  refs/heads/master 4e0d5f596 -> 7019aa70d


Make stage names consistent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72f50209
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72f50209
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72f50209

Branch: refs/heads/master
Commit: 72f502091c2c3e534f41b3fbd211c2a701e89eba
Parents: 4e0d5f5
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:29:15 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:02 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py       |  6 +++---
 .../io/gcp/datastore/v1/datastoreio.py           |  6 +++---
 sdks/python/apache_beam/io/iobase.py             |  2 +-
 sdks/python/apache_beam/pipeline.py              |  4 ++--
 sdks/python/apache_beam/transforms/core.py       | 19 ++++++++++---------
 sdks/python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py    |  4 ++--
 7 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4686518..891f62a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,11 +45,11 @@ call *one* row of the main table and *all* rows of the side table. The runner
 may use some caching techniques to share the side inputs between calls in order
 to avoid excessive reading:::
 
-  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
-      | 'process data' >> beam.Map(
+      | 'ProcessData' >> beam.Map(
           lambda element, side_input: ..., AsList(side_table)))
 
 There is no difference in how main and side inputs are read. What makes the

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index d9b3598..a0ccbbb 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -137,15 +137,15 @@ class ReadFromDatastore(PTransform):
     #   outputs a ``PCollection[Entity]``.
 
     queries = (pcoll.pipeline
-               | 'User Query' >> Create([self._query])
-               | 'Split Query' >> ParDo(ReadFromDatastore.SplitQueryFn(
+               | 'UserQuery' >> Create([self._query])
+               | 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn(
                    self._project, self._query, self._datastore_namespace,
                    self._num_splits)))
 
     sharded_queries = (queries
                        | GroupByKey()
                        | Values()
-                       | 'flatten' >> FlatMap(lambda x: x))
+                       | 'Flatten' >> FlatMap(lambda x: x))
 
     entities = sharded_queries | 'Read' >> ParDo(
         ReadFromDatastore.ReadFn(self._project, self._datastore_namespace))

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index d9df5c4..2cac67f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -903,7 +903,7 @@ class WriteImpl(ptransform.PTransform):
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
                            | 'Extract' >> core.FlatMap(lambda x: x[1]))
-    return do_once | 'finalize_write' >> core.FlatMap(
+    return do_once | 'FinalizeWrite' >> core.FlatMap(
         _finalize_write,
         self.sink,
         AsSingleton(init_result_coll),

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 2ff9eb3..8e811bc 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -32,11 +32,11 @@ Typical usage:
 
   # Add to the pipeline a "Create" transform. When executed this
   # transform will produce a PCollection object with the specified values.
-  pcoll = p | 'create' >> beam.Create([1, 2, 3])
+  pcoll = p | 'Create' >> beam.Create([1, 2, 3])
 
   # Another transform could be applied to pcoll, e.g., writing to a text file.
   # For other transforms, refer to transforms/ directory.
-  pcoll | 'write' >> beam.io.WriteToText('./output')
+  pcoll | 'Write' >> beam.io.WriteToText('./output')
 
   # run() will execute the DAG stored in the pipeline.  The execution of the
   # nodes visited is done using the specified local runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index bdfddbb..2d28eec 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1098,21 +1098,22 @@ class GroupByKey(PTransform):
 
       # pylint: disable=bad-continuation
       return (pcoll
-              | 'reify_windows' >> (ParDo(self.ReifyWindows())
+              | 'ReifyWindows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | 'group_by_key' >> (GroupByKeyOnly()
+              | 'GroupByKey' >> (GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
-              | ('group_by_window' >> ParDo(
+              | ('GroupByWindow' >> ParDo(
                      self.GroupAlsoByWindow(pcoll.windowing))
                  .with_input_types(gbk_input_type)
                  .with_output_types(gbk_output_type)))
-    # If the input_type is None, run the default
-    return (pcoll
-            | 'reify_windows' >> ParDo(self.ReifyWindows())
-            | 'group_by_key' >> GroupByKeyOnly()
-            | 'group_by_window' >> ParDo(
-                self.GroupAlsoByWindow(pcoll.windowing)))
+    else:
+      # The input_type is None, run the default
+      return (pcoll
+              | 'ReifyWindows' >> ParDo(self.ReifyWindows())
+              | 'GroupByKey' >> GroupByKeyOnly()
+              | 'GroupByWindow' >> ParDo(
+                    self.GroupAlsoByWindow(pcoll.windowing)))
 
 
 @typehints.with_input_types(typehints.KV[K, V])

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 9b7a37f..e2c4428 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -629,7 +629,7 @@ def ptransform_fn(fn):
   With either method the custom PTransform can be used in pipelines as if
   it were one of the "native" PTransforms::
 
-    result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
+    result_pcoll = input_pcoll | 'Label' >> CustomMapper(somefn)
 
   Note that for both solutions the underlying implementation of the pipe
   operator (i.e., `|`) will inject the pcoll argument in its proper place

http://git-wip-us.apache.org/repos/asf/beam/blob/72f50209/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index b92af83..78277c2 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -959,8 +959,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
-         | 'to_ints' >> beam.Map(bool_to_int))
+         | 'Bools' >> beam.Create([True, False, True]).with_output_types(bool)
+         | 'ToInts' >> beam.Map(bool_to_int))
     assert_that(d, equal_to([1, 0, 1]))
     self.p.run()
 


[2/6] beam git commit: Require deterministic window coders.

Posted by ro...@apache.org.
Require deterministic window coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2

Branch: refs/heads/master
Commit: d98294c2bd13b45522ea584485bd62e900144c88
Parents: 72f5020
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:49:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:03 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py          | 16 ----------------
 .../apache_beam/coders/coders_test_common.py      |  1 -
 sdks/python/apache_beam/transforms/core.py        |  4 ++++
 sdks/python/apache_beam/transforms/window.py      | 18 +++++++++++++++++-
 4 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8ef0a46..4f75182 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -688,22 +688,6 @@ class IterableCoder(FastCoder):
     return hash((type(self), self._elem_coder))
 
 
-class WindowCoder(PickleCoder):
-  """Coder for windows in windowed values."""
-
-  def _create_impl(self):
-    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
-
-  def is_deterministic(self):
-    # Note that WindowCoder as implemented is not deterministic because the
-    # implementation simply pickles windows.  See the corresponding comments
-    # on PickleCoder for more details.
-    return False
-
-  def as_cloud_object(self):
-    return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
-
-
 class GlobalWindowCoder(SingletonCoder):
   """Coder for global windows."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 6491ea8..da0bde3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase):
                      coders.FastCoder,
                      coders.ProtoCoder,
                      coders.ToStringCoder,
-                     coders.WindowCoder,
                      coders.IntervalWindowCoder])
     assert not standard - cls.seen, standard - cls.seen
     assert not standard - cls.seen_nested, standard - cls.seen_nested

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 2d28eec..9f66c39 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1185,6 +1185,10 @@ class Windowing(object):
       else:
         raise ValueError(
             'accumulation_mode must be provided for non-trivial triggers')
+    if not windowfn.get_window_coder().is_deterministic():
+      raise ValueError(
+          'window fn (%s) does not have a determanistic coder (%s)' % (
+              window_fn, windowfn.get_window_coder()))
     self.windowfn = windowfn
     self.triggerfn = triggerfn
     self.accumulation_mode = accumulation_mode

http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 931a17d..643cb99 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -49,6 +49,8 @@ WindowFn.
 
 from __future__ import absolute_import
 
+import abc
+
 from google.protobuf import struct_pb2
 from google.protobuf import wrappers_pb2
 
@@ -93,6 +95,8 @@ class OutputTimeFn(object):
 class WindowFn(object):
   """An abstract windowing function defining a basic assign and merge."""
 
+  __metaclass__ = abc.ABCMeta
+
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
 
@@ -100,6 +104,7 @@ class WindowFn(object):
       self.timestamp = Timestamp.of(timestamp)
       self.element = element
 
+  @abc.abstractmethod
   def assign(self, assign_context):
     """Associates a timestamp to an element."""
     raise NotImplementedError
@@ -113,6 +118,7 @@ class WindowFn(object):
     def merge(self, to_be_merged, merge_result):
       raise NotImplementedError
 
+  @abc.abstractmethod
   def merge(self, merge_context):
     """Returns a window that is the result of merging a set of windows."""
     raise NotImplementedError
@@ -121,8 +127,9 @@ class WindowFn(object):
     """Returns whether this WindowFn merges windows."""
     return True
 
+  @abc.abstractmethod
   def get_window_coder(self):
-    return coders.WindowCoder()
+    raise NotImplementedError
 
   def get_transformed_output_time(self, window, input_timestamp):  # pylint: disable=unused-argument
     """Given input time and output window, returns output time for window.
@@ -344,6 +351,9 @@ class FixedWindows(NonMergingWindowFn):
     start = timestamp - (timestamp - self.offset) % self.size
     return [IntervalWindow(start, start + self.size)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def __eq__(self, other):
     if type(self) == type(other) == FixedWindows:
       return self.size == other.size and self.offset == other.offset
@@ -398,6 +408,9 @@ class SlidingWindows(NonMergingWindowFn):
         for s in range(start.micros, timestamp.micros - self.size.micros,
                        -self.period.micros)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def __eq__(self, other):
     if type(self) == type(other) == SlidingWindows:
       return (self.size == other.size
@@ -443,6 +456,9 @@ class Sessions(WindowFn):
     timestamp = context.timestamp
     return [IntervalWindow(timestamp, timestamp + self.gap_size)]
 
+  def get_window_coder(self):
+    return coders.IntervalWindowCoder()
+
   def merge(self, merge_context):
     to_merge = []
     end = timeutil.MIN_TIMESTAMP


[6/6] beam git commit: Closes #2552

Posted by ro...@apache.org.
Closes #2552


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7019aa70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7019aa70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7019aa70

Branch: refs/heads/master
Commit: 7019aa70db62575ae275ae93155bb0903a1ff26e
Parents: 4e0d5f5 2c34719
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Apr 20 08:55:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:06 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        | 16 ---------
 .../apache_beam/coders/coders_test_common.py    |  7 ++--
 sdks/python/apache_beam/io/gcp/bigquery.py      |  6 ++--
 .../io/gcp/datastore/v1/datastoreio.py          |  6 ++--
 sdks/python/apache_beam/io/iobase.py            |  2 +-
 sdks/python/apache_beam/pipeline.py             |  4 +--
 sdks/python/apache_beam/runners/runner.py       | 34 --------------------
 sdks/python/apache_beam/transforms/core.py      | 23 +++++++------
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   |  4 +--
 sdks/python/apache_beam/transforms/trigger.py   | 10 +++---
 .../apache_beam/transforms/trigger_test.py      | 10 +++---
 sdks/python/apache_beam/transforms/window.py    | 18 ++++++++++-
 13 files changed, 58 insertions(+), 84 deletions(-)
----------------------------------------------------------------------



[5/6] beam git commit: Rename AfterFirst to AfterAny for consistency with Java.

Posted by ro...@apache.org.
Rename AfterFirst to AfterAny for consistency with Java.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c347192
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c347192
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c347192

Branch: refs/heads/master
Commit: 2c347192709e4076a1ac59b5ea769d4d3b2494f4
Parents: 4b9029a
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 11:00:04 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:05 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/trigger.py      | 10 ++++++----
 sdks/python/apache_beam/transforms/trigger_test.py | 10 +++++-----
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6a4cf24..b9786f4 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -186,7 +186,7 @@ class TriggerFn(object):
   def from_runner_api(proto, context):
     return {
         'after_all': AfterAll,
-        'after_any': AfterFirst,
+        'after_any': AfterAny,
         'after_each': AfterEach,
         'after_end_of_window': AfterWatermark,
         # after_processing_time, after_synchronized_processing_time
@@ -488,7 +488,8 @@ class ParallelTriggerFn(TriggerFn):
         in proto.after_all.subtriggers or proto.after_any.subtriggers]
     if proto.after_all.subtriggers:
       return AfterAll(*subtriggers)
-    return AfterFirst(*subtriggers)
+    else:
+      return AfterAny(*subtriggers)
 
   def to_runner_api(self, context):
     subtriggers = [
@@ -505,7 +506,7 @@ class ParallelTriggerFn(TriggerFn):
       raise NotImplementedError(self)
 
 
-class AfterFirst(ParallelTriggerFn):
+class AfterAny(ParallelTriggerFn):
   """Fires when any subtrigger fires.
 
   Also finishes when any subtrigger finishes.
@@ -589,7 +590,8 @@ class AfterEach(TriggerFn):
                 for subtrigger in self.triggers]))
 
 
-class OrFinally(AfterFirst):
+class OrFinally(AfterAny):
+
   @staticmethod
   def from_runner_api(proto, context):
     return OrFinally(

http://git-wip-us.apache.org/repos/asf/beam/blob/2c347192/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 9f2046a..914babb 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -33,7 +33,7 @@ from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterAll
 from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.trigger import AfterEach
-from apache_beam.transforms.trigger import AfterFirst
+from apache_beam.transforms.trigger import AfterAny
 from apache_beam.transforms.trigger import AfterWatermark
 from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
@@ -217,7 +217,7 @@ class TriggerTest(unittest.TestCase):
   def test_fixed_after_first(self):
     self.run_trigger_simple(
         FixedWindows(10),  # pyformat break
-        AfterFirst(AfterCount(2), AfterWatermark()),
+        AfterAny(AfterCount(2), AfterWatermark()),
         AccumulationMode.ACCUMULATING,
         [(1, 'a'), (2, 'b'), (3, 'c')],
         {IntervalWindow(0, 10): [set('ab')]},
@@ -225,7 +225,7 @@ class TriggerTest(unittest.TestCase):
         2)
     self.run_trigger_simple(
         FixedWindows(10),  # pyformat break
-        AfterFirst(AfterCount(5), AfterWatermark()),
+        AfterAny(AfterCount(5), AfterWatermark()),
         AccumulationMode.ACCUMULATING,
         [(1, 'a'), (2, 'b'), (3, 'c')],
         {IntervalWindow(0, 10): [set('abc')]},
@@ -236,7 +236,7 @@ class TriggerTest(unittest.TestCase):
   def test_repeatedly_after_first(self):
     self.run_trigger_simple(
         FixedWindows(100),  # pyformat break
-        Repeatedly(AfterFirst(AfterCount(3), AfterWatermark())),
+        Repeatedly(AfterAny(AfterCount(3), AfterWatermark())),
         AccumulationMode.ACCUMULATING,
         zip(range(7), 'abcdefg'),
         {IntervalWindow(0, 100): [
@@ -388,7 +388,7 @@ class RunnerApiTest(unittest.TestCase):
     for trigger_fn in (
         DefaultTrigger(),
         AfterAll(AfterCount(1), AfterCount(10)),
-        AfterFirst(AfterCount(10), AfterCount(100)),
+        AfterAny(AfterCount(10), AfterCount(100)),
         AfterWatermark(early=AfterCount(1000)),
         AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
         Repeatedly(AfterCount(100)),


[3/6] beam git commit: Remove obsolete and unused Runner.clear

Posted by ro...@apache.org.
Remove obsolete and unused Runner.clear


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b9029ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b9029ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b9029ac

Branch: refs/heads/master
Commit: 4b9029ac3e965ed3629833138597f1fb365b0876
Parents: 68c0042
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:57:35 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:04 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py | 34 --------------------------
 1 file changed, 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4b9029ac/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index ccb066b..b35cb7b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -99,8 +99,6 @@ class PipelineRunner(object):
 
   The base runner provides a run() method for visiting every node in the
   pipeline's DAG and executing the transforms computing the PValue in the node.
-  It also provides a clear() method for visiting every node and clearing out
-  the values contained in PValue objects produced during a run.
 
   A custom runner will typically provide implementations for some of the
   transform methods (ParDo, GroupByKey, Create, etc.). It may also
@@ -129,38 +127,6 @@ class PipelineRunner(object):
 
     pipeline.visit(RunVisitor(self))
 
-  def clear(self, pipeline, node=None):
-    """Clear all nodes or nodes reachable from node of materialized values.
-
-    Args:
-      pipeline: Pipeline object containing PValues to be cleared.
-      node: Optional node in the Pipeline processing DAG. If specified only
-        nodes reachable from this node will be cleared (ancestors of the node).
-
-    This method is not intended (for now) to be called by users of Runner
-    objects. It is a hook for future layers on top of the current programming
-    model to control how much of the previously computed values are kept
-    around. Presumably an interactivity layer will use it. The simplest way
-    to change the behavior would be to define a runner that overwrites the
-    clear_pvalue() method since this method (runner.clear) will visit all
-    relevant nodes and call clear_pvalue on them.
-
-    """
-
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.pipeline import PipelineVisitor
-
-    class ClearVisitor(PipelineVisitor):
-
-      def __init__(self, runner):
-        self.runner = runner
-
-      def visit_value(self, value, _):
-        self.runner.clear_pvalue(value)
-
-    pipeline.visit(ClearVisitor(self), node=node)
-
   def apply(self, transform, input):
     """Runner callback for a pipeline.apply call.
 


[4/6] beam git commit: Enable IntervalWindowCoder test check.

Posted by ro...@apache.org.
Enable IntervalWindowCoder test check.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68c00426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68c00426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68c00426

Branch: refs/heads/master
Commit: 68c00426261c54a32be509f9ab53cf3543d49d78
Parents: d98294c
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 11 10:52:44 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:55:04 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders_test_common.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/68c00426/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index da0bde3..e5bfe35 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -61,8 +61,7 @@ class CodersTest(unittest.TestCase):
     standard -= set([coders.Coder,
                      coders.FastCoder,
                      coders.ProtoCoder,
-                     coders.ToStringCoder,
-                     coders.IntervalWindowCoder])
+                     coders.ToStringCoder])
     assert not standard - cls.seen, standard - cls.seen
     assert not standard - cls.seen_nested, standard - cls.seen_nested
 
@@ -171,6 +170,9 @@ class CodersTest(unittest.TestCase):
                      *[window.IntervalWindow(x, y)
                        for x in [-2**52, 0, 2**52]
                        for y in range(-100, 100)])
+    self.check_coder(
+        coders.TupleCoder((coders.IntervalWindowCoder(),)),
+        (window.IntervalWindow(0, 10),))
 
   def test_timestamp_coder(self):
     self.check_coder(coders.TimestampCoder(),