You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/07/26 17:56:06 UTC

[1/4] beam git commit: Disable abc metaclass due to issues with pickling.

Repository: beam
Updated Branches:
  refs/heads/master 69a51c988 -> b67a30bf1


Disable abc metaclass due to issues with pickling.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24b39e45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24b39e45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24b39e45

Branch: refs/heads/master
Commit: 24b39e457519f45df55a90a6c6c92fa8df5128b7
Parents: 22e49e6
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Jul 24 17:19:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jul 26 10:55:53 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/urns.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/24b39e45/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index c155cfd..9e4635d 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -53,7 +53,8 @@ class RunnerApiFn(object):
   to register serialization via pickling.
   """
 
-  __metaclass__ = abc.ABCMeta
+  # TODO(robertwb): Figure out issue with dill + local classes + abc metaclass
+  # __metaclass__ = abc.ABCMeta
 
   _known_urns = {}
 


[4/4] beam git commit: Closes #3635

Posted by ro...@apache.org.
Closes #3635


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b67a30bf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b67a30bf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b67a30bf

Branch: refs/heads/master
Commit: b67a30bf1586990c56fc19e1510417bdab71f280
Parents: 69a51c9 2d307f2
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jul 26 10:55:54 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jul 26 10:55:54 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/pubsub.py        |  3 ++
 sdks/python/apache_beam/io/iobase.py            | 31 +++++++++++++++++++-
 .../runners/dataflow/native_io/iobase.py        |  6 +++-
 sdks/python/apache_beam/utils/urns.py           |  6 +++-
 4 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: Create common superclass for all Sources.

Posted by ro...@apache.org.
Create common superclass for all Sources.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d307f29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d307f29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d307f29

Branch: refs/heads/master
Commit: 2d307f29b77e6aeb28f624f50246e99422a1173d
Parents: 24b39e4
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jul 25 13:33:03 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jul 26 10:55:54 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py                    | 12 ++++++++----
 .../apache_beam/runners/dataflow/native_io/iobase.py    |  6 ++----
 2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2d307f29/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index bdd06e1..db75fe3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -72,7 +72,13 @@ SourceBundle = namedtuple(
     'weight source start_position stop_position')
 
 
-class BoundedSource(HasDisplayData, urns.RunnerApiFn):
+class SourceBase(HasDisplayData, urns.RunnerApiFn):
+  """Base class for all sources that can be passed to beam.io.Read(...).
+  """
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
+
+
+class BoundedSource(SourceBase):
   """A source that reads a finite amount of input records.
 
   This class defines following operations which can be used to read the source
@@ -194,8 +200,6 @@ class BoundedSource(HasDisplayData, urns.RunnerApiFn):
   def is_bounded(self):
     return True
 
-  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
-
 
 class RangeTracker(object):
   """A thread safe object used by Dataflow source framework.
@@ -837,7 +841,7 @@ class Read(ptransform.PTransform):
 
   @staticmethod
   def from_runner_api_parameter(parameter, context):
-    return Read(BoundedSource.from_runner_api(parameter.source, context))
+    return Read(SourceBase.from_runner_api(parameter.source, context))
 
 
 ptransform.PTransform.register_urn(

http://git-wip-us.apache.org/repos/asf/beam/blob/2d307f29/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index f29b714..2f2316f 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -23,9 +23,9 @@ For internal use only; no backwards-compatibility guarantees.
 import logging
 
 from apache_beam import pvalue
+from apache_beam.io import iobase
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.display import HasDisplayData
-from apache_beam.utils import urns
 
 
 def _dict_printable_fields(dict_object, skip_fields):
@@ -43,7 +43,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder',
                  'compression_type']
 
 
-class NativeSource(HasDisplayData, urns.RunnerApiFn):
+class NativeSource(iobase.SourceBase):
   """A source implemented by Dataflow service.
 
   This class is to be only inherited by sources natively implemented by Cloud
@@ -65,8 +65,6 @@ class NativeSource(HasDisplayData, urns.RunnerApiFn):
         vals=', '.join(_dict_printable_fields(self.__dict__,
                                               _minor_fields)))
 
-  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
-
 
 class NativeSourceReader(object):
   """A reader for a source implemented by Dataflow service."""


[2/4] beam git commit: Use runner API for read operation.

Posted by ro...@apache.org.
Use runner API for read operation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22e49e60
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22e49e60
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22e49e60

Branch: refs/heads/master
Commit: 22e49e6035c54f95c604be8cf15daecba458240a
Parents: 69a51c9
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 17 16:51:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jul 26 10:55:53 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/pubsub.py        |  3 +++
 sdks/python/apache_beam/io/iobase.py            | 27 +++++++++++++++++++-
 .../runners/dataflow/native_io/iobase.py        |  8 +++++-
 sdks/python/apache_beam/utils/urns.py           |  3 +++
 4 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/22e49e60/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 32d388a..7d1f355 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -183,6 +183,9 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
     raise NotImplementedError(
         'PubSubPayloadSource is not supported in local execution.')
 
+  def is_bounded(self):
+    return False
+
 
 class _PubSubPayloadSink(dataflow_io.NativeSink):
   """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic."""

http://git-wip-us.apache.org/repos/asf/beam/blob/22e49e60/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 7e40d83..bdd06e1 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -37,6 +37,7 @@ import uuid
 
 from apache_beam import pvalue
 from apache_beam import coders
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import AsIter
 from apache_beam.pvalue import AsSingleton
 from apache_beam.transforms import core
@@ -44,6 +45,7 @@ from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils import urns
 from apache_beam.utils.windowed_value import WindowedValue
 
 __all__ = ['BoundedSource', 'RangeTracker', 'Read', 'Sink', 'Write', 'Writer']
@@ -70,7 +72,7 @@ SourceBundle = namedtuple(
     'weight source start_position stop_position')
 
 
-class BoundedSource(HasDisplayData):
+class BoundedSource(HasDisplayData, urns.RunnerApiFn):
   """A source that reads a finite amount of input records.
 
   This class defines following operations which can be used to read the source
@@ -189,6 +191,11 @@ class BoundedSource(HasDisplayData):
     """
     return coders.registry.get_coder(object)
 
+  def is_bounded(self):
+    return True
+
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
+
 
 class RangeTracker(object):
   """A thread safe object used by Dataflow source framework.
@@ -820,6 +827,24 @@ class Read(ptransform.PTransform):
                                       label='Read Source'),
             'source_dd': self.source}
 
+  def to_runner_api_parameter(self, context):
+    return (urns.READ_TRANSFORM,
+            beam_runner_api_pb2.ReadPayload(
+                source=self.source.to_runner_api(context),
+                is_bounded=beam_runner_api_pb2.BOUNDED
+                if self.source.is_bounded()
+                else beam_runner_api_pb2.UNBOUNDED))
+
+  @staticmethod
+  def from_runner_api_parameter(parameter, context):
+    return Read(BoundedSource.from_runner_api(parameter.source, context))
+
+
+ptransform.PTransform.register_urn(
+    urns.READ_TRANSFORM,
+    beam_runner_api_pb2.ReadPayload,
+    Read.from_runner_api_parameter)
+
 
 class Write(ptransform.PTransform):
   """A ``PTransform`` that writes to a sink.

http://git-wip-us.apache.org/repos/asf/beam/blob/22e49e60/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index c1f4238..f29b714 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -25,6 +25,7 @@ import logging
 from apache_beam import pvalue
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.display import HasDisplayData
+from apache_beam.utils import urns
 
 
 def _dict_printable_fields(dict_object, skip_fields):
@@ -42,7 +43,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder',
                  'compression_type']
 
 
-class NativeSource(HasDisplayData):
+class NativeSource(HasDisplayData, urns.RunnerApiFn):
   """A source implemented by Dataflow service.
 
   This class is to be only inherited by sources natively implemented by Cloud
@@ -55,12 +56,17 @@ class NativeSource(HasDisplayData):
     """Returns a NativeSourceReader instance associated with this source."""
     raise NotImplementedError
 
+  def is_bounded(self):
+    return True
+
   def __repr__(self):
     return '<{name} {vals}>'.format(
         name=self.__class__.__name__,
         vals=', '.join(_dict_printable_fields(self.__dict__,
                                               _minor_fields)))
 
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
+
 
 class NativeSourceReader(object):
   """A reader for a source implemented by Dataflow service."""

http://git-wip-us.apache.org/repos/asf/beam/blob/22e49e60/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index e553eea..c155cfd 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -36,8 +36,11 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
+READ_TRANSFORM = "beam:ptransform:read:v0.1"
 WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
 
+PICKLED_SOURCE = "beam:source:pickled_python:v0.1"
+
 
 class RunnerApiFn(object):
   """Abstract base class that provides urn registration utilities.