You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/10 19:05:12 UTC

[3/6] incubator-beam git commit: Move dataflow native sinks and sources into dataflow directory.

Move dataflow native sinks and sources into dataflow directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78520758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78520758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78520758

Branch: refs/heads/python-sdk
Commit: 78520758abc3b1c2b38e26f3ffd64e01870de067
Parents: 90004a0
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 6 16:52:57 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Oct 10 10:30:00 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py            | 287 +----------------
 .../apache_beam/runners/dataflow/__init__.py    |   0
 .../runners/dataflow/native_io/__init__.py      |   0
 .../runners/dataflow/native_io/iobase.py        | 319 +++++++++++++++++++
 4 files changed, 321 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..b83d7eb 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -43,263 +43,8 @@ from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
 
-
-def _dict_printable_fields(dict_object, skip_fields):
-  """Returns a list of strings for the interesting fields of a dict."""
-  return ['%s=%r' % (name, value)
-          for name, value in dict_object.iteritems()
-          # want to output value 0 but not None nor []
-          if (value or value == 0)
-          and name not in skip_fields]
-
-_minor_fields = ['coder', 'key_coder', 'value_coder',
-                 'config_bytes', 'elements',
-                 'append_trailing_newlines', 'strip_trailing_newlines',
-                 'compression_type']
-
-
-class NativeSource(object):
-  """A source implemented by Dataflow service.
-
-  This class is to be only inherited by sources natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-
-  This class is deprecated and should not be used to define new sources.
-  """
-
-  def reader(self):
-    """Returns a NativeSourceReader instance associated with this source."""
-    raise NotImplementedError
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=', '.join(_dict_printable_fields(self.__dict__,
-                                              _minor_fields)))
-
-
-class NativeSourceReader(object):
-  """A reader for a source implemented by Dataflow service."""
-
-  def __enter__(self):
-    """Opens everything necessary for a reader to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a reader executed."""
-    raise NotImplementedError
-
-  def __iter__(self):
-    """Returns an iterator over all the records of the source."""
-    raise NotImplementedError
-
-  @property
-  def returns_windowed_values(self):
-    """Returns whether this reader returns windowed values."""
-    return False
-
-  def get_progress(self):
-    """Returns a representation of how far the reader has read.
-
-    Returns:
-      A SourceReaderProgress object that gives the current progress of the
-      reader.
-    """
-    return
-
-  def request_dynamic_split(self, dynamic_split_request):
-    """Attempts to split the input in two parts.
-
-    The two parts are named the "primary" part and the "residual" part. The
-    current 'NativeSourceReader' keeps processing the primary part, while the
-    residual part will be processed elsewhere (e.g. perhaps on a different
-    worker).
-
-    The primary and residual parts, if concatenated, must represent the
-    same input as the current input of this 'NativeSourceReader' before this
-    call.
-
-    The boundary between the primary part and the residual part is
-    specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
-    if the framework supports the notion of positions, it might be a
-    position at which the input is asked to split itself (which is not
-    necessarily the same position at which it *will* split itself); it
-    might be an approximate fraction of input, or something else.
-
-    This function returns a 'DynamicSplitResult', which encodes, in a
-    framework-specific way, the information sufficient to construct a
-    description of the resulting primary and residual inputs. For example, it
-    might, again, be a position demarcating these parts, or it might be a pair
-    of fully-specified input descriptions, or something else.
-
-    After a successful call to 'request_dynamic_split()', subsequent calls
-    should be interpreted relative to the new primary.
-
-    Args:
-      dynamic_split_request: A 'DynamicSplitRequest' describing the split
-        request.
-
-    Returns:
-      'None' if the 'DynamicSplitRequest' cannot be honored (in that
-      case the input represented by this 'NativeSourceReader' stays the same),
-      or a 'DynamicSplitResult' describing how the input was split into a
-      primary and residual part.
-    """
-    logging.debug(
-        'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
-        'split request: %r',
-        self, dynamic_split_request)
-    return
-
-
-class ReaderProgress(object):
-  """A representation of how far a NativeSourceReader has read."""
-
-  def __init__(self, position=None, percent_complete=None, remaining_time=None):
-
-    self._position = position
-
-    if percent_complete is not None:
-      percent_complete = float(percent_complete)
-      if percent_complete < 0 or percent_complete > 1:
-        raise ValueError(
-            'The percent_complete argument was %f. Must be in range [0, 1].'
-            % percent_complete)
-    self._percent_complete = percent_complete
-
-    self._remaining_time = remaining_time
-
-  @property
-  def position(self):
-    """Returns progress, represented as a ReaderPosition object."""
-    return self._position
-
-  @property
-  def percent_complete(self):
-    """Returns progress, represented as a percentage of total work.
-
-    Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
-    work range, entire WorkItem complete).
-
-    Returns:
-      Progress represented as a percentage of total work.
-    """
-    return self._percent_complete
-
-  @property
-  def remaining_time(self):
-    """Returns progress, represented as an estimated time remaining."""
-    return self._remaining_time
-
-
-class ReaderPosition(object):
-  """A representation of position in an iteration of a 'NativeSourceReader'."""
-
-  def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
-               shuffle_position=None, concat_position=None):
-    """Initializes ReaderPosition.
-
-    A ReaderPosition may get instantiated for one of these position types. Only
-    one of these should be specified.
-
-    Args:
-      end: position is past all other positions. For example, this may be used
-        to represent the end position of an unbounded range.
-      key: position is a string key.
-      byte_offset: position is a byte offset.
-      record_index: position is a record index
-      shuffle_position: position is a base64 encoded shuffle position.
-      concat_position: position is a 'ConcatPosition'.
-    """
-
-    self.end = end
-    self.key = key
-    self.byte_offset = byte_offset
-    self.record_index = record_index
-    self.shuffle_position = shuffle_position
-
-    if concat_position is not None:
-      assert isinstance(concat_position, ConcatPosition)
-    self.concat_position = concat_position
-
-
-class ConcatPosition(object):
-  """A position that encapsulate an inner position and an index.
-
-  This is used to represent the position of a source that encapsulate several
-  other sources.
-  """
-
-  def __init__(self, index, position):
-    """Initializes ConcatPosition.
-
-    Args:
-      index: index of the source currently being read.
-      position: inner position within the source currently being read.
-    """
-
-    if position is not None:
-      assert isinstance(position, ReaderPosition)
-    self.index = index
-    self.position = position
-
-
-class DynamicSplitRequest(object):
-  """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
-  """
-
-  def __init__(self, progress):
-    assert isinstance(progress, ReaderProgress)
-    self.progress = progress
-
-
-class DynamicSplitResult(object):
-  pass
-
-
-class DynamicSplitResultWithPosition(DynamicSplitResult):
-
-  def __init__(self, stop_position):
-    assert isinstance(stop_position, ReaderPosition)
-    self.stop_position = stop_position
-
-
-class NativeSink(object):
-  """A sink implemented by Dataflow service.
-
-  This class is to be only inherited by sinks natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def writer(self):
-    """Returns a SinkWriter for this source."""
-    raise NotImplementedError
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=_dict_printable_fields(self.__dict__, _minor_fields))
-
-
-class NativeSinkWriter(object):
-  """A writer for a sink implemented by Dataflow service."""
-
-  def __enter__(self):
-    """Opens everything necessary for a writer to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a writer executed."""
-    raise NotImplementedError
-
-  @property
-  def takes_windowed_values(self):
-    """Returns whether this writer takes windowed values."""
-    return False
-
-  def Write(self, o):  # pylint: disable=invalid-name
-    """Writes a record to the sink associated with this writer."""
-    raise NotImplementedError
+from apache_beam.runners.dataflow.native_io.iobase import *
+from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite
 
 
 # Encapsulates information about a bundle of a source generated when method
@@ -887,34 +632,6 @@ class Writer(object):
     raise NotImplementedError
 
 
-class _NativeWrite(ptransform.PTransform):
-  """A PTransform for writing to a Dataflow native sink.
-
-  These are sinks that are implemented natively by the Dataflow service
-  and hence should not be updated by users. These sinks are processed
-  using a Dataflow native write transform.
-
-  Applying this transform results in a ``pvalue.PDone``.
-  """
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a Write transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, sink) or (sink).
-    """
-    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
-    super(_NativeWrite, self).__init__(label)
-    self.sink = sink
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    return pvalue.PDone(pcoll.pipeline)
-
-
 class Read(ptransform.PTransform):
   """A transform that reads a PCollection."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
new file mode 100644
index 0000000..bccca9f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -0,0 +1,319 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Dataflow native sources and sinks.
+"""
+
+from collections import namedtuple
+
+import logging
+import random
+import uuid
+
+from apache_beam import pvalue
+from apache_beam.coders import PickleCoder
+from apache_beam.pvalue import AsIter
+from apache_beam.pvalue import AsSingleton
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import window
+
+
+def _dict_printable_fields(dict_object, skip_fields):
+  """Returns a list of strings for the interesting fields of a dict."""
+  return ['%s=%r' % (name, value)
+          for name, value in dict_object.iteritems()
+          # want to output value 0 but not None nor []
+          if (value or value == 0)
+          and name not in skip_fields]
+
+_minor_fields = ['coder', 'key_coder', 'value_coder',
+                 'config_bytes', 'elements',
+                 'append_trailing_newlines', 'strip_trailing_newlines',
+                 'compression_type']
+
+
+class NativeSource(object):
+  """A source implemented by Dataflow service.
+
+  This class is to be only inherited by sources natively implemented by Cloud
+  Dataflow service, hence should not be sub-classed by users.
+
+  This class is deprecated and should not be used to define new sources.
+  """
+
+  def reader(self):
+    """Returns a NativeSourceReader instance associated with this source."""
+    raise NotImplementedError
+
+  def __repr__(self):
+    return '<{name} {vals}>'.format(
+        name=self.__class__.__name__,
+        vals=', '.join(_dict_printable_fields(self.__dict__,
+                                              _minor_fields)))
+
+
+class NativeSourceReader(object):
+  """A reader for a source implemented by Dataflow service."""
+
+  def __enter__(self):
+    """Opens everything necessary for a reader to function properly."""
+    raise NotImplementedError
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    """Cleans up after a reader executed."""
+    raise NotImplementedError
+
+  def __iter__(self):
+    """Returns an iterator over all the records of the source."""
+    raise NotImplementedError
+
+  @property
+  def returns_windowed_values(self):
+    """Returns whether this reader returns windowed values."""
+    return False
+
+  def get_progress(self):
+    """Returns a representation of how far the reader has read.
+
+    Returns:
+      A SourceReaderProgress object that gives the current progress of the
+      reader.
+    """
+    return
+
+  def request_dynamic_split(self, dynamic_split_request):
+    """Attempts to split the input in two parts.
+
+    The two parts are named the "primary" part and the "residual" part. The
+    current 'NativeSourceReader' keeps processing the primary part, while the
+    residual part will be processed elsewhere (e.g. perhaps on a different
+    worker).
+
+    The primary and residual parts, if concatenated, must represent the
+    same input as the current input of this 'NativeSourceReader' before this
+    call.
+
+    The boundary between the primary part and the residual part is
+    specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
+    if the framework supports the notion of positions, it might be a
+    position at which the input is asked to split itself (which is not
+    necessarily the same position at which it *will* split itself); it
+    might be an approximate fraction of input, or something else.
+
+    This function returns a 'DynamicSplitResult', which encodes, in a
+    framework-specific way, the information sufficient to construct a
+    description of the resulting primary and residual inputs. For example, it
+    might, again, be a position demarcating these parts, or it might be a pair
+    of fully-specified input descriptions, or something else.
+
+    After a successful call to 'request_dynamic_split()', subsequent calls
+    should be interpreted relative to the new primary.
+
+    Args:
+      dynamic_split_request: A 'DynamicSplitRequest' describing the split
+        request.
+
+    Returns:
+      'None' if the 'DynamicSplitRequest' cannot be honored (in that
+      case the input represented by this 'NativeSourceReader' stays the same),
+      or a 'DynamicSplitResult' describing how the input was split into a
+      primary and residual part.
+    """
+    logging.debug(
+        'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
+        'split request: %r',
+        self, dynamic_split_request)
+    return
+
+
+class ReaderProgress(object):
+  """A representation of how far a NativeSourceReader has read."""
+
+  def __init__(self, position=None, percent_complete=None, remaining_time=None):
+
+    self._position = position
+
+    if percent_complete is not None:
+      percent_complete = float(percent_complete)
+      if percent_complete < 0 or percent_complete > 1:
+        raise ValueError(
+            'The percent_complete argument was %f. Must be in range [0, 1].'
+            % percent_complete)
+    self._percent_complete = percent_complete
+
+    self._remaining_time = remaining_time
+
+  @property
+  def position(self):
+    """Returns progress, represented as a ReaderPosition object."""
+    return self._position
+
+  @property
+  def percent_complete(self):
+    """Returns progress, represented as a percentage of total work.
+
+    Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
+    work range, entire WorkItem complete).
+
+    Returns:
+      Progress represented as a percentage of total work.
+    """
+    return self._percent_complete
+
+  @property
+  def remaining_time(self):
+    """Returns progress, represented as an estimated time remaining."""
+    return self._remaining_time
+
+
+class ReaderPosition(object):
+  """A representation of position in an iteration of a 'NativeSourceReader'."""
+
+  def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
+               shuffle_position=None, concat_position=None):
+    """Initializes ReaderPosition.
+
+    A ReaderPosition may get instantiated for one of these position types. Only
+    one of these should be specified.
+
+    Args:
+      end: position is past all other positions. For example, this may be used
+        to represent the end position of an unbounded range.
+      key: position is a string key.
+      byte_offset: position is a byte offset.
+      record_index: position is a record index
+      shuffle_position: position is a base64 encoded shuffle position.
+      concat_position: position is a 'ConcatPosition'.
+    """
+
+    self.end = end
+    self.key = key
+    self.byte_offset = byte_offset
+    self.record_index = record_index
+    self.shuffle_position = shuffle_position
+
+    if concat_position is not None:
+      assert isinstance(concat_position, ConcatPosition)
+    self.concat_position = concat_position
+
+
+class ConcatPosition(object):
+  """A position that encapsulate an inner position and an index.
+
+  This is used to represent the position of a source that encapsulate several
+  other sources.
+  """
+
+  def __init__(self, index, position):
+    """Initializes ConcatPosition.
+
+    Args:
+      index: index of the source currently being read.
+      position: inner position within the source currently being read.
+    """
+
+    if position is not None:
+      assert isinstance(position, ReaderPosition)
+    self.index = index
+    self.position = position
+
+
+class DynamicSplitRequest(object):
+  """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
+  """
+
+  def __init__(self, progress):
+    assert isinstance(progress, ReaderProgress)
+    self.progress = progress
+
+
+class DynamicSplitResult(object):
+  pass
+
+
+class DynamicSplitResultWithPosition(DynamicSplitResult):
+
+  def __init__(self, stop_position):
+    assert isinstance(stop_position, ReaderPosition)
+    self.stop_position = stop_position
+
+
+class NativeSink(object):
+  """A sink implemented by Dataflow service.
+
+  This class is to be only inherited by sinks natively implemented by Cloud
+  Dataflow service, hence should not be sub-classed by users.
+  """
+
+  def writer(self):
+    """Returns a SinkWriter for this source."""
+    raise NotImplementedError
+
+  def __repr__(self):
+    return '<{name} {vals}>'.format(
+        name=self.__class__.__name__,
+        vals=_dict_printable_fields(self.__dict__, _minor_fields))
+
+
+class NativeSinkWriter(object):
+  """A writer for a sink implemented by Dataflow service."""
+
+  def __enter__(self):
+    """Opens everything necessary for a writer to function properly."""
+    raise NotImplementedError
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    """Cleans up after a writer executed."""
+    raise NotImplementedError
+
+  @property
+  def takes_windowed_values(self):
+    """Returns whether this writer takes windowed values."""
+    return False
+
+  def Write(self, o):  # pylint: disable=invalid-name
+    """Writes a record to the sink associated with this writer."""
+    raise NotImplementedError
+
+
+class _NativeWrite(ptransform.PTransform):
+  """A PTransform for writing to a Dataflow native sink.
+
+  These are sinks that are implemented natively by the Dataflow service
+  and hence should not be updated by users. These sinks are processed
+  using a Dataflow native write transform.
+
+  Applying this transform results in a ``pvalue.PDone``.
+  """
+
+  def __init__(self, *args, **kwargs):
+    """Initializes a Write transform.
+
+    Args:
+      *args: A tuple of position arguments.
+      **kwargs: A dictionary of keyword arguments.
+
+    The *args, **kwargs are expected to be (label, sink) or (sink).
+    """
+    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
+    super(_NativeWrite, self).__init__(label)
+    self.sink = sink
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    return pvalue.PDone(pcoll.pipeline)