You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/07/26 17:56:08 UTC
[3/4] beam git commit: Create common superclass for all Sources.
Create common superclass for all Sources.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d307f29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d307f29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d307f29
Branch: refs/heads/master
Commit: 2d307f29b77e6aeb28f624f50246e99422a1173d
Parents: 24b39e4
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jul 25 13:33:03 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jul 26 10:55:54 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 12 ++++++++----
.../apache_beam/runners/dataflow/native_io/iobase.py | 6 ++----
2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2d307f29/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index bdd06e1..db75fe3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -72,7 +72,13 @@ SourceBundle = namedtuple(
'weight source start_position stop_position')
-class BoundedSource(HasDisplayData, urns.RunnerApiFn):
+class SourceBase(HasDisplayData, urns.RunnerApiFn):
+ """Base class for all sources that can be passed to beam.io.Read(...).
+ """
+ urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
+
+
+class BoundedSource(SourceBase):
"""A source that reads a finite amount of input records.
This class defines following operations which can be used to read the source
@@ -194,8 +200,6 @@ class BoundedSource(HasDisplayData, urns.RunnerApiFn):
def is_bounded(self):
return True
- urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
-
class RangeTracker(object):
"""A thread safe object used by Dataflow source framework.
@@ -837,7 +841,7 @@ class Read(ptransform.PTransform):
@staticmethod
def from_runner_api_parameter(parameter, context):
- return Read(BoundedSource.from_runner_api(parameter.source, context))
+ return Read(SourceBase.from_runner_api(parameter.source, context))
ptransform.PTransform.register_urn(
http://git-wip-us.apache.org/repos/asf/beam/blob/2d307f29/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index f29b714..2f2316f 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -23,9 +23,9 @@ For internal use only; no backwards-compatibility guarantees.
import logging
from apache_beam import pvalue
+from apache_beam.io import iobase
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import HasDisplayData
-from apache_beam.utils import urns
def _dict_printable_fields(dict_object, skip_fields):
@@ -43,7 +43,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder',
'compression_type']
-class NativeSource(HasDisplayData, urns.RunnerApiFn):
+class NativeSource(iobase.SourceBase):
"""A source implemented by Dataflow service.
This class is to be only inherited by sources natively implemented by Cloud
@@ -65,8 +65,6 @@ class NativeSource(HasDisplayData, urns.RunnerApiFn):
vals=', '.join(_dict_printable_fields(self.__dict__,
_minor_fields)))
- urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE)
-
class NativeSourceReader(object):
"""A reader for a source implemented by Dataflow service."""