You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/10 19:05:12 UTC
[3/6] incubator-beam git commit: Move dataflow native sinks and
sources into dataflow directory.
Move dataflow native sinks and sources into dataflow directory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78520758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78520758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78520758
Branch: refs/heads/python-sdk
Commit: 78520758abc3b1c2b38e26f3ffd64e01870de067
Parents: 90004a0
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 6 16:52:57 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Oct 10 10:30:00 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 287 +----------------
.../apache_beam/runners/dataflow/__init__.py | 0
.../runners/dataflow/native_io/__init__.py | 0
.../runners/dataflow/native_io/iobase.py | 319 +++++++++++++++++++
4 files changed, 321 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..b83d7eb 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -43,263 +43,8 @@ from apache_beam.transforms import core
from apache_beam.transforms import ptransform
from apache_beam.transforms import window
-
-def _dict_printable_fields(dict_object, skip_fields):
- """Returns a list of strings for the interesting fields of a dict."""
- return ['%s=%r' % (name, value)
- for name, value in dict_object.iteritems()
- # want to output value 0 but not None nor []
- if (value or value == 0)
- and name not in skip_fields]
-
-_minor_fields = ['coder', 'key_coder', 'value_coder',
- 'config_bytes', 'elements',
- 'append_trailing_newlines', 'strip_trailing_newlines',
- 'compression_type']
-
-
-class NativeSource(object):
- """A source implemented by Dataflow service.
-
- This class is to be only inherited by sources natively implemented by Cloud
- Dataflow service, hence should not be sub-classed by users.
-
- This class is deprecated and should not be used to define new sources.
- """
-
- def reader(self):
- """Returns a NativeSourceReader instance associated with this source."""
- raise NotImplementedError
-
- def __repr__(self):
- return '<{name} {vals}>'.format(
- name=self.__class__.__name__,
- vals=', '.join(_dict_printable_fields(self.__dict__,
- _minor_fields)))
-
-
-class NativeSourceReader(object):
- """A reader for a source implemented by Dataflow service."""
-
- def __enter__(self):
- """Opens everything necessary for a reader to function properly."""
- raise NotImplementedError
-
- def __exit__(self, exception_type, exception_value, traceback):
- """Cleans up after a reader executed."""
- raise NotImplementedError
-
- def __iter__(self):
- """Returns an iterator over all the records of the source."""
- raise NotImplementedError
-
- @property
- def returns_windowed_values(self):
- """Returns whether this reader returns windowed values."""
- return False
-
- def get_progress(self):
- """Returns a representation of how far the reader has read.
-
- Returns:
- A SourceReaderProgress object that gives the current progress of the
- reader.
- """
- return
-
- def request_dynamic_split(self, dynamic_split_request):
- """Attempts to split the input in two parts.
-
- The two parts are named the "primary" part and the "residual" part. The
- current 'NativeSourceReader' keeps processing the primary part, while the
- residual part will be processed elsewhere (e.g. perhaps on a different
- worker).
-
- The primary and residual parts, if concatenated, must represent the
- same input as the current input of this 'NativeSourceReader' before this
- call.
-
- The boundary between the primary part and the residual part is
- specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
- if the framework supports the notion of positions, it might be a
- position at which the input is asked to split itself (which is not
- necessarily the same position at which it *will* split itself); it
- might be an approximate fraction of input, or something else.
-
- This function returns a 'DynamicSplitResult', which encodes, in a
- framework-specific way, the information sufficient to construct a
- description of the resulting primary and residual inputs. For example, it
- might, again, be a position demarcating these parts, or it might be a pair
- of fully-specified input descriptions, or something else.
-
- After a successful call to 'request_dynamic_split()', subsequent calls
- should be interpreted relative to the new primary.
-
- Args:
- dynamic_split_request: A 'DynamicSplitRequest' describing the split
- request.
-
- Returns:
- 'None' if the 'DynamicSplitRequest' cannot be honored (in that
- case the input represented by this 'NativeSourceReader' stays the same),
- or a 'DynamicSplitResult' describing how the input was split into a
- primary and residual part.
- """
- logging.debug(
- 'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
- 'split request: %r',
- self, dynamic_split_request)
- return
-
-
-class ReaderProgress(object):
- """A representation of how far a NativeSourceReader has read."""
-
- def __init__(self, position=None, percent_complete=None, remaining_time=None):
-
- self._position = position
-
- if percent_complete is not None:
- percent_complete = float(percent_complete)
- if percent_complete < 0 or percent_complete > 1:
- raise ValueError(
- 'The percent_complete argument was %f. Must be in range [0, 1].'
- % percent_complete)
- self._percent_complete = percent_complete
-
- self._remaining_time = remaining_time
-
- @property
- def position(self):
- """Returns progress, represented as a ReaderPosition object."""
- return self._position
-
- @property
- def percent_complete(self):
- """Returns progress, represented as a percentage of total work.
-
- Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
- work range, entire WorkItem complete).
-
- Returns:
- Progress represented as a percentage of total work.
- """
- return self._percent_complete
-
- @property
- def remaining_time(self):
- """Returns progress, represented as an estimated time remaining."""
- return self._remaining_time
-
-
-class ReaderPosition(object):
- """A representation of position in an iteration of a 'NativeSourceReader'."""
-
- def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
- shuffle_position=None, concat_position=None):
- """Initializes ReaderPosition.
-
- A ReaderPosition may get instantiated for one of these position types. Only
- one of these should be specified.
-
- Args:
- end: position is past all other positions. For example, this may be used
- to represent the end position of an unbounded range.
- key: position is a string key.
- byte_offset: position is a byte offset.
- record_index: position is a record index
- shuffle_position: position is a base64 encoded shuffle position.
- concat_position: position is a 'ConcatPosition'.
- """
-
- self.end = end
- self.key = key
- self.byte_offset = byte_offset
- self.record_index = record_index
- self.shuffle_position = shuffle_position
-
- if concat_position is not None:
- assert isinstance(concat_position, ConcatPosition)
- self.concat_position = concat_position
-
-
-class ConcatPosition(object):
- """A position that encapsulate an inner position and an index.
-
- This is used to represent the position of a source that encapsulate several
- other sources.
- """
-
- def __init__(self, index, position):
- """Initializes ConcatPosition.
-
- Args:
- index: index of the source currently being read.
- position: inner position within the source currently being read.
- """
-
- if position is not None:
- assert isinstance(position, ReaderPosition)
- self.index = index
- self.position = position
-
-
-class DynamicSplitRequest(object):
- """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
- """
-
- def __init__(self, progress):
- assert isinstance(progress, ReaderProgress)
- self.progress = progress
-
-
-class DynamicSplitResult(object):
- pass
-
-
-class DynamicSplitResultWithPosition(DynamicSplitResult):
-
- def __init__(self, stop_position):
- assert isinstance(stop_position, ReaderPosition)
- self.stop_position = stop_position
-
-
-class NativeSink(object):
- """A sink implemented by Dataflow service.
-
- This class is to be only inherited by sinks natively implemented by Cloud
- Dataflow service, hence should not be sub-classed by users.
- """
-
- def writer(self):
- """Returns a SinkWriter for this source."""
- raise NotImplementedError
-
- def __repr__(self):
- return '<{name} {vals}>'.format(
- name=self.__class__.__name__,
- vals=_dict_printable_fields(self.__dict__, _minor_fields))
-
-
-class NativeSinkWriter(object):
- """A writer for a sink implemented by Dataflow service."""
-
- def __enter__(self):
- """Opens everything necessary for a writer to function properly."""
- raise NotImplementedError
-
- def __exit__(self, exception_type, exception_value, traceback):
- """Cleans up after a writer executed."""
- raise NotImplementedError
-
- @property
- def takes_windowed_values(self):
- """Returns whether this writer takes windowed values."""
- return False
-
- def Write(self, o): # pylint: disable=invalid-name
- """Writes a record to the sink associated with this writer."""
- raise NotImplementedError
+from apache_beam.runners.dataflow.native_io.iobase import *
+from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite
# Encapsulates information about a bundle of a source generated when method
@@ -887,34 +632,6 @@ class Writer(object):
raise NotImplementedError
-class _NativeWrite(ptransform.PTransform):
- """A PTransform for writing to a Dataflow native sink.
-
- These are sinks that are implemented natively by the Dataflow service
- and hence should not be updated by users. These sinks are processed
- using a Dataflow native write transform.
-
- Applying this transform results in a ``pvalue.PDone``.
- """
-
- def __init__(self, *args, **kwargs):
- """Initializes a Write transform.
-
- Args:
- *args: A tuple of position arguments.
- **kwargs: A dictionary of keyword arguments.
-
- The *args, **kwargs are expected to be (label, sink) or (sink).
- """
- label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
- super(_NativeWrite, self).__init__(label)
- self.sink = sink
-
- def apply(self, pcoll):
- self._check_pcollection(pcoll)
- return pvalue.PDone(pcoll.pipeline)
-
-
class Read(ptransform.PTransform):
"""A transform that reads a PCollection."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
new file mode 100644
index 0000000..bccca9f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -0,0 +1,319 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Dataflow native sources and sinks.
+"""
+
+from collections import namedtuple
+
+import logging
+import random
+import uuid
+
+from apache_beam import pvalue
+from apache_beam.coders import PickleCoder
+from apache_beam.pvalue import AsIter
+from apache_beam.pvalue import AsSingleton
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import window
+
+
+def _dict_printable_fields(dict_object, skip_fields):
+ """Returns a list of strings for the interesting fields of a dict."""
+ return ['%s=%r' % (name, value)
+ for name, value in dict_object.iteritems()
+ # want to output value 0 but not None nor []
+ if (value or value == 0)
+ and name not in skip_fields]
+
+_minor_fields = ['coder', 'key_coder', 'value_coder',
+ 'config_bytes', 'elements',
+ 'append_trailing_newlines', 'strip_trailing_newlines',
+ 'compression_type']
+
+
+class NativeSource(object):
+ """A source implemented by Dataflow service.
+
+ This class is to be only inherited by sources natively implemented by Cloud
+ Dataflow service, hence should not be sub-classed by users.
+
+ This class is deprecated and should not be used to define new sources.
+ """
+
+ def reader(self):
+ """Returns a NativeSourceReader instance associated with this source."""
+ raise NotImplementedError
+
+ def __repr__(self):
+ return '<{name} {vals}>'.format(
+ name=self.__class__.__name__,
+ vals=', '.join(_dict_printable_fields(self.__dict__,
+ _minor_fields)))
+
+
+class NativeSourceReader(object):
+ """A reader for a source implemented by Dataflow service."""
+
+ def __enter__(self):
+ """Opens everything necessary for a reader to function properly."""
+ raise NotImplementedError
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ """Cleans up after a reader executed."""
+ raise NotImplementedError
+
+ def __iter__(self):
+ """Returns an iterator over all the records of the source."""
+ raise NotImplementedError
+
+ @property
+ def returns_windowed_values(self):
+ """Returns whether this reader returns windowed values."""
+ return False
+
+ def get_progress(self):
+ """Returns a representation of how far the reader has read.
+
+ Returns:
+ A SourceReaderProgress object that gives the current progress of the
+ reader.
+ """
+ return
+
+ def request_dynamic_split(self, dynamic_split_request):
+ """Attempts to split the input in two parts.
+
+ The two parts are named the "primary" part and the "residual" part. The
+ current 'NativeSourceReader' keeps processing the primary part, while the
+ residual part will be processed elsewhere (e.g. perhaps on a different
+ worker).
+
+ The primary and residual parts, if concatenated, must represent the
+ same input as the current input of this 'NativeSourceReader' before this
+ call.
+
+ The boundary between the primary part and the residual part is
+ specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
+ if the framework supports the notion of positions, it might be a
+ position at which the input is asked to split itself (which is not
+ necessarily the same position at which it *will* split itself); it
+ might be an approximate fraction of input, or something else.
+
+ This function returns a 'DynamicSplitResult', which encodes, in a
+ framework-specific way, the information sufficient to construct a
+ description of the resulting primary and residual inputs. For example, it
+ might, again, be a position demarcating these parts, or it might be a pair
+ of fully-specified input descriptions, or something else.
+
+ After a successful call to 'request_dynamic_split()', subsequent calls
+ should be interpreted relative to the new primary.
+
+ Args:
+ dynamic_split_request: A 'DynamicSplitRequest' describing the split
+ request.
+
+ Returns:
+ 'None' if the 'DynamicSplitRequest' cannot be honored (in that
+ case the input represented by this 'NativeSourceReader' stays the same),
+ or a 'DynamicSplitResult' describing how the input was split into a
+ primary and residual part.
+ """
+ logging.debug(
+ 'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
+ 'split request: %r',
+ self, dynamic_split_request)
+ return
+
+
+class ReaderProgress(object):
+ """A representation of how far a NativeSourceReader has read."""
+
+ def __init__(self, position=None, percent_complete=None, remaining_time=None):
+
+ self._position = position
+
+ if percent_complete is not None:
+ percent_complete = float(percent_complete)
+ if percent_complete < 0 or percent_complete > 1:
+ raise ValueError(
+ 'The percent_complete argument was %f. Must be in range [0, 1].'
+ % percent_complete)
+ self._percent_complete = percent_complete
+
+ self._remaining_time = remaining_time
+
+ @property
+ def position(self):
+ """Returns progress, represented as a ReaderPosition object."""
+ return self._position
+
+ @property
+ def percent_complete(self):
+ """Returns progress, represented as a percentage of total work.
+
+ Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
+ work range, entire WorkItem complete).
+
+ Returns:
+ Progress represented as a percentage of total work.
+ """
+ return self._percent_complete
+
+ @property
+ def remaining_time(self):
+ """Returns progress, represented as an estimated time remaining."""
+ return self._remaining_time
+
+
+class ReaderPosition(object):
+ """A representation of position in an iteration of a 'NativeSourceReader'."""
+
+ def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
+ shuffle_position=None, concat_position=None):
+ """Initializes ReaderPosition.
+
+ A ReaderPosition may get instantiated for one of these position types. Only
+ one of these should be specified.
+
+ Args:
+ end: position is past all other positions. For example, this may be used
+ to represent the end position of an unbounded range.
+ key: position is a string key.
+ byte_offset: position is a byte offset.
+ record_index: position is a record index
+ shuffle_position: position is a base64 encoded shuffle position.
+ concat_position: position is a 'ConcatPosition'.
+ """
+
+ self.end = end
+ self.key = key
+ self.byte_offset = byte_offset
+ self.record_index = record_index
+ self.shuffle_position = shuffle_position
+
+ if concat_position is not None:
+ assert isinstance(concat_position, ConcatPosition)
+ self.concat_position = concat_position
+
+
+class ConcatPosition(object):
+ """A position that encapsulate an inner position and an index.
+
+ This is used to represent the position of a source that encapsulate several
+ other sources.
+ """
+
+ def __init__(self, index, position):
+ """Initializes ConcatPosition.
+
+ Args:
+ index: index of the source currently being read.
+ position: inner position within the source currently being read.
+ """
+
+ if position is not None:
+ assert isinstance(position, ReaderPosition)
+ self.index = index
+ self.position = position
+
+
+class DynamicSplitRequest(object):
+ """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
+ """
+
+ def __init__(self, progress):
+ assert isinstance(progress, ReaderProgress)
+ self.progress = progress
+
+
+class DynamicSplitResult(object):
+ pass
+
+
+class DynamicSplitResultWithPosition(DynamicSplitResult):
+
+ def __init__(self, stop_position):
+ assert isinstance(stop_position, ReaderPosition)
+ self.stop_position = stop_position
+
+
+class NativeSink(object):
+ """A sink implemented by Dataflow service.
+
+ This class is to be only inherited by sinks natively implemented by Cloud
+ Dataflow service, hence should not be sub-classed by users.
+ """
+
+ def writer(self):
+ """Returns a SinkWriter for this source."""
+ raise NotImplementedError
+
+ def __repr__(self):
+ return '<{name} {vals}>'.format(
+ name=self.__class__.__name__,
+ vals=_dict_printable_fields(self.__dict__, _minor_fields))
+
+
+class NativeSinkWriter(object):
+ """A writer for a sink implemented by Dataflow service."""
+
+ def __enter__(self):
+ """Opens everything necessary for a writer to function properly."""
+ raise NotImplementedError
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ """Cleans up after a writer executed."""
+ raise NotImplementedError
+
+ @property
+ def takes_windowed_values(self):
+ """Returns whether this writer takes windowed values."""
+ return False
+
+ def Write(self, o): # pylint: disable=invalid-name
+ """Writes a record to the sink associated with this writer."""
+ raise NotImplementedError
+
+
+class _NativeWrite(ptransform.PTransform):
+ """A PTransform for writing to a Dataflow native sink.
+
+ These are sinks that are implemented natively by the Dataflow service
+ and hence should not be updated by users. These sinks are processed
+ using a Dataflow native write transform.
+
+ Applying this transform results in a ``pvalue.PDone``.
+ """
+
+ def __init__(self, *args, **kwargs):
+ """Initializes a Write transform.
+
+ Args:
+ *args: A tuple of position arguments.
+ **kwargs: A dictionary of keyword arguments.
+
+ The *args, **kwargs are expected to be (label, sink) or (sink).
+ """
+ label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
+ super(_NativeWrite, self).__init__(label)
+ self.sink = sink
+
+ def apply(self, pcoll):
+ self._check_pcollection(pcoll)
+ return pvalue.PDone(pcoll.pipeline)