You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:46 UTC

[11/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/io/iobase.py b/sdks/python/google/cloud/dataflow/io/iobase.py
deleted file mode 100644
index 26ebeb5..0000000
--- a/sdks/python/google/cloud/dataflow/io/iobase.py
+++ /dev/null
@@ -1,1073 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Sources and sinks.
-
-A Source manages record-oriented data input from a particular kind of source
-(e.g. a set of files, a database table, etc.). The reader() method of a source
-returns a reader object supporting the iterator protocol; iteration yields
-raw records of unprocessed, serialized data.
-
-
-A Sink manages record-oriented data output to a particular kind of sink
-(e.g. a set of files, a database table, etc.). The writer() method of a sink
-returns a writer object supporting writing records of serialized data to
-the sink.
-"""
-
-from collections import namedtuple
-
-import logging
-import random
-import uuid
-
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow.coders import PickleCoder
-from google.cloud.dataflow.pvalue import AsIter
-from google.cloud.dataflow.pvalue import AsSingleton
-from google.cloud.dataflow.transforms import core
-from google.cloud.dataflow.transforms import ptransform
-from google.cloud.dataflow.transforms import window
-
-
-def _dict_printable_fields(dict_object, skip_fields):
-  """Returns a list of strings for the interesting fields of a dict."""
-  return ['%s=%r' % (name, value)
-          for name, value in dict_object.iteritems()
-          # want to output value 0 but not None nor []
-          if (value or value == 0)
-          and name not in skip_fields]
-
-_minor_fields = ['coder', 'key_coder', 'value_coder',
-                 'config_bytes', 'elements',
-                 'append_trailing_newlines', 'strip_trailing_newlines',
-                 'compression_type']
-
-
-class NativeSource(object):
-  """A source implemented by Dataflow service.
-
-  This class is to be only inherited by sources natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-
-  This class is deprecated and should not be used to define new sources.
-  """
-
-  def reader(self):
-    """Returns a NativeSourceReader instance associated with this source."""
-    raise NotImplementedError
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=', '.join(_dict_printable_fields(self.__dict__,
-                                              _minor_fields)))
-
-
-class NativeSourceReader(object):
-  """A reader for a source implemented by Dataflow service."""
-
-  def __enter__(self):
-    """Opens everything necessary for a reader to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a reader executed."""
-    raise NotImplementedError
-
-  def __iter__(self):
-    """Returns an iterator over all the records of the source."""
-    raise NotImplementedError
-
-  @property
-  def returns_windowed_values(self):
-    """Returns whether this reader returns windowed values."""
-    return False
-
-  def get_progress(self):
-    """Returns a representation of how far the reader has read.
-
-    Returns:
-      A SourceReaderProgress object that gives the current progress of the
-      reader.
-    """
-    return
-
-  def request_dynamic_split(self, dynamic_split_request):
-    """Attempts to split the input in two parts.
-
-    The two parts are named the "primary" part and the "residual" part. The
-    current 'NativeSourceReader' keeps processing the primary part, while the
-    residual part will be processed elsewhere (e.g. perhaps on a different
-    worker).
-
-    The primary and residual parts, if concatenated, must represent the
-    same input as the current input of this 'NativeSourceReader' before this
-    call.
-
-    The boundary between the primary part and the residual part is
-    specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
-    if the framework supports the notion of positions, it might be a
-    position at which the input is asked to split itself (which is not
-    necessarily the same position at which it *will* split itself); it
-    might be an approximate fraction of input, or something else.
-
-    This function returns a 'DynamicSplitResult', which encodes, in a
-    framework-specific way, the information sufficient to construct a
-    description of the resulting primary and residual inputs. For example, it
-    might, again, be a position demarcating these parts, or it might be a pair
-    of fully-specified input descriptions, or something else.
-
-    After a successful call to 'request_dynamic_split()', subsequent calls
-    should be interpreted relative to the new primary.
-
-    Args:
-      dynamic_split_request: A 'DynamicSplitRequest' describing the split
-        request.
-
-    Returns:
-      'None' if the 'DynamicSplitRequest' cannot be honored (in that
-      case the input represented by this 'NativeSourceReader' stays the same),
-      or a 'DynamicSplitResult' describing how the input was split into a
-      primary and residual part.
-    """
-    logging.debug(
-        'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
-        'split request: %r',
-        self, dynamic_split_request)
-    return
-
-
-class ReaderProgress(object):
-  """A representation of how far a NativeSourceReader has read."""
-
-  def __init__(self, position=None, percent_complete=None, remaining_time=None):
-
-    self._position = position
-
-    if percent_complete is not None:
-      percent_complete = float(percent_complete)
-      if percent_complete < 0 or percent_complete > 1:
-        raise ValueError(
-            'The percent_complete argument was %f. Must be in range [0, 1].'
-            % percent_complete)
-    self._percent_complete = percent_complete
-
-    self._remaining_time = remaining_time
-
-  @property
-  def position(self):
-    """Returns progress, represented as a ReaderPosition object."""
-    return self._position
-
-  @property
-  def percent_complete(self):
-    """Returns progress, represented as a percentage of total work.
-
-    Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
-    work range, entire WorkItem complete).
-
-    Returns:
-      Progress represented as a percentage of total work.
-    """
-    return self._percent_complete
-
-  @property
-  def remaining_time(self):
-    """Returns progress, represented as an estimated time remaining."""
-    return self._remaining_time
-
-
-class ReaderPosition(object):
-  """A representation of position in an iteration of a 'NativeSourceReader'."""
-
-  def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
-               shuffle_position=None, concat_position=None):
-    """Initializes ReaderPosition.
-
-    A ReaderPosition may get instantiated for one of these position types. Only
-    one of these should be specified.
-
-    Args:
-      end: position is past all other positions. For example, this may be used
-        to represent the end position of an unbounded range.
-      key: position is a string key.
-      byte_offset: position is a byte offset.
-      record_index: position is a record index
-      shuffle_position: position is a base64 encoded shuffle position.
-      concat_position: position is a 'ConcatPosition'.
-    """
-
-    self.end = end
-    self.key = key
-    self.byte_offset = byte_offset
-    self.record_index = record_index
-    self.shuffle_position = shuffle_position
-
-    if concat_position is not None:
-      assert isinstance(concat_position, ConcatPosition)
-    self.concat_position = concat_position
-
-
-class ConcatPosition(object):
-  """A position that encapsulate an inner position and an index.
-
-  This is used to represent the position of a source that encapsulate several
-  other sources.
-  """
-
-  def __init__(self, index, position):
-    """Initializes ConcatPosition.
-
-    Args:
-      index: index of the source currently being read.
-      position: inner position within the source currently being read.
-    """
-
-    if position is not None:
-      assert isinstance(position, ReaderPosition)
-    self.index = index
-    self.position = position
-
-
-class DynamicSplitRequest(object):
-  """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
-  """
-
-  def __init__(self, progress):
-    assert isinstance(progress, ReaderProgress)
-    self.progress = progress
-
-
-class DynamicSplitResult(object):
-  pass
-
-
-class DynamicSplitResultWithPosition(DynamicSplitResult):
-
-  def __init__(self, stop_position):
-    assert isinstance(stop_position, ReaderPosition)
-    self.stop_position = stop_position
-
-
-class NativeSink(object):
-  """A sink implemented by Dataflow service.
-
-  This class is to be only inherited by sinks natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def writer(self):
-    """Returns a SinkWriter for this source."""
-    raise NotImplementedError
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=_dict_printable_fields(self.__dict__, _minor_fields))
-
-
-class NativeSinkWriter(object):
-  """A writer for a sink implemented by Dataflow service."""
-
-  def __enter__(self):
-    """Opens everything necessary for a writer to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a writer executed."""
-    raise NotImplementedError
-
-  @property
-  def takes_windowed_values(self):
-    """Returns whether this writer takes windowed values."""
-    return False
-
-  def Write(self, o):  # pylint: disable=invalid-name
-    """Writes a record to the sink associated with this writer."""
-    raise NotImplementedError
-
-
-# Encapsulates information about a bundle of a source generated when method
-# BoundedSource.split() is invoked.
-# This is a named 4-tuple that has following fields.
-# * weight - a number that represents the size of the bundle. This value will
-#            be used to compare the relative sizes of bundles generated by the
-#            current source.
-#            The weight returned here could be specified using a unit of your
-#            choice (for example, bundles of sizes 100MB, 200MB, and 700MB may
-#            specify weights 100, 200, 700 or 1, 2, 7) but all bundles of a
-#            source should specify the weight using the same unit.
-# * source - a BoundedSource object for the  bundle.
-# * start_position - starting position of the bundle
-# * stop_position - ending position of the bundle.
-#
-# Type for start and stop positions are specific to the bounded source and must
-# be consistent throughout.
-SourceBundle = namedtuple(
-    'SourceBundle',
-    'weight source start_position stop_position')
-
-
-class BoundedSource(object):
-  """A Dataflow source that reads a finite amount of input records.
-
-  This class defines following operations which can be used to read the source
-  efficiently.
-
-  * Size estimation - method ``estimate_size()`` may return an accurate
-    estimation in bytes for the size of the source.
-  * Splitting into bundles of a given size - method ``split()`` can be used to
-    split the source into a set of sub-sources (bundles) based on a desired
-    bundle size.
-  * Getting a RangeTracker - method ``get_range_tracker() should return a
-    ``RangeTracker`` object for a given position range for the position type
-    of the records returned by the source.
-  * Reading the data - method ``read()`` can be used to read data from the
-    source while respecting the boundaries defined by a given
-    ``RangeTracker``.
-  """
-
-  def estimate_size(self):
-    """Estimates the size of source in bytes.
-
-    An estimate of the total size (in bytes) of the data that would be read
-    from this source. This estimate is in terms of external storage size,
-    before performing decompression or other processing.
-
-    Returns:
-      estimated size of the source if the size can be determined, ``None``
-      otherwise.
-    """
-    raise NotImplementedError
-
-  def split(self, desired_bundle_size, start_position=None, stop_position=None):
-    """Splits the source into a set of bundles.
-
-    Bundles should be approximately of size ``desired_bundle_size`` bytes.
-
-    Args:
-      desired_bundle_size: the desired size (in bytes) of the bundles returned.
-      start_position: if specified the given position must be used as the
-                      starting position of the first bundle.
-      stop_position: if specified the given position must be used as the ending
-                     position of the last bundle.
-    Returns:
-      an iterator of objects of type 'SourceBundle' that gives information about
-      the generated bundles.
-    """
-    raise NotImplementedError
-
-  def get_range_tracker(self, start_position, stop_position):
-    """Returns a RangeTracker for a given position range.
-
-    Framework may invoke ``read()`` method with the RangeTracker object returned
-    here to read data from the source.
-    Args:
-      start_position: starting position of the range.
-      stop_position:  ending position of the range.
-    Returns:
-      a ``RangeTracker`` for the given position range.
-    """
-    raise NotImplementedError
-
-  def read(self, range_tracker):
-    """Returns an iterator that reads data from the source.
-
-    The returned set of data must respect the boundaries defined by the given
-    ``RangeTracker`` object. For example:
-
-      * Returned set of data must be for the range
-        ``[range_tracker.start_position, range_tracker.stop_position)``. Note
-        that a source may decide to return records that start after
-        ``range_tracker.stop_position``. See documentation in class
-        ``RangeTracker`` for more details. Also, note that framework might
-        invoke ``range_tracker.try_split()`` to perform dynamic split
-        operations. range_tracker.stop_position may be updated
-        dynamically due to successful dynamic split operations.
-      * Method ``range_tracker.try_split()`` must be invoked for every record
-        that starts at a split point.
-      * Method ``range_tracker.record_current_position()`` may be invoked for
-        records that do not start at split points.
-
-    Args:
-      range_tracker: a ``RangeTracker`` whose boundaries must be respected
-                     when reading data from the source. If 'None' all records
-                     represented by the current source should be read.
-    Returns:
-      an iterator of data read by the source.
-    """
-    raise NotImplementedError
-
-  def default_output_coder(self):
-    """Coder that should be used for the records returned by the source."""
-    return PickleCoder()
-
-
-class RangeTracker(object):
-  """A thread safe object used by Dataflow source framework.
-
-  A Dataflow source is defined using a ''BoundedSource'' and a ''RangeTracker''
-  pair. A ''RangeTracker'' is used by Dataflow source framework to perform
-  dynamic work rebalancing of position-based sources.
-
-  **Position-based sources**
-
-  A position-based source is one where the source can be described by a range
-  of positions of an ordered type and the records returned by the reader can be
-  described by positions of the same type.
-
-  In case a record occupies a range of positions in the source, the most
-  important thing about the record is the position where it starts.
-
-  Defining the semantics of positions for a source is entirely up to the source
-  class, however the chosen definitions have to obey certain properties in order
-  to make it possible to correctly split the source into parts, including
-  dynamic splitting. Two main aspects need to be defined:
-
-  1. How to assign starting positions to records.
-  2. Which records should be read by a source with a range '[A, B)'.
-
-  Moreover, reading a range must be *efficient*, i.e., the performance of
-  reading a range should not significantly depend on the location of the range.
-  For example, reading the range [A, B) should not require reading all data
-  before 'A'.
-
-  The sections below explain exactly what properties these definitions must
-  satisfy, and how to use a ``RangeTracker`` with a properly defined source.
-
-  **Properties of position-based sources**
-
-  The main requirement for position-based sources is *associativity*: reading
-  records from '[A, B)' and records from '[B, C)' should give the same
-  records as reading from '[A, C)', where 'A <= B <= C'. This property
-  ensures that no matter how a range of positions is split into arbitrarily many
-  sub-ranges, the total set of records described by them stays the same.
-
-  The other important property is how the source's range relates to positions of
-  records in the source. In many sources each record can be identified by a
-  unique starting position. In this case:
-
-  * All records returned by a source '[A, B)' must have starting positions in
-    this range.
-  * All but the last record should end within this range. The last record may or
-    may not extend past the end of the range.
-  * Records should not overlap.
-
-  Such sources should define "read '[A, B)'" as "read from the first record
-  starting at or after 'A', up to but not including the first record starting
-  at or after 'B'".
-
-  Some examples of such sources include reading lines or CSV from a text file,
-  reading keys and values from a BigTable, etc.
-
-  The concept of *split points* allows to extend the definitions for dealing
-  with sources where some records cannot be identified by a unique starting
-  position.
-
-  In all cases, all records returned by a source '[A, B)' must *start* at or
-  after 'A'.
-
-  **Split points**
-
-  Some sources may have records that are not directly addressable. For example,
-  imagine a file format consisting of a sequence of compressed blocks. Each
-  block can be assigned an offset, but records within the block cannot be
-  directly addressed without decompressing the block. Let us refer to this
-  hypothetical format as <i>CBF (Compressed Blocks Format)</i>.
-
-  Many such formats can still satisfy the associativity property. For example,
-  in CBF, reading '[A, B)' can mean "read all the records in all blocks whose
-  starting offset is in '[A, B)'".
-
-  To support such complex formats, we introduce the notion of *split points*. We
-  say that a record is a split point if there exists a position 'A' such that
-  the record is the first one to be returned when reading the range
-  '[A, infinity)'. In CBF, the only split points would be the first records
-  in each block.
-
-  Split points allow us to define the meaning of a record's position and a
-  source's range in all cases:
-
-  * For a record that is at a split point, its position is defined to be the
-    largest 'A' such that reading a source with the range '[A, infinity)'
-    returns this record.
-  * Positions of other records are only required to be non-decreasing.
-  * Reading the source '[A, B)' must return records starting from the first
-    split point at or after 'A', up to but not including the first split point
-    at or after 'B'. In particular, this means that the first record returned
-    by a source MUST always be a split point.
-  * Positions of split points must be unique.
-
-  As a result, for any decomposition of the full range of the source into
-  position ranges, the total set of records will be the full set of records in
-  the source, and each record will be read exactly once.
-
-  **Consumed positions**
-
-  As the source is being read, and records read from it are being passed to the
-  downstream transforms in the pipeline, we say that positions in the source are
-  being *consumed*. When a reader has read a record (or promised to a caller
-  that a record will be returned), positions up to and including the record's
-  start position are considered *consumed*.
-
-  Dynamic splitting can happen only at *unconsumed* positions. If the reader
-  just returned a record at offset 42 in a file, dynamic splitting can happen
-  only at offset 43 or beyond, as otherwise that record could be read twice (by
-  the current reader and by a reader of the task starting at 43).
-  """
-
-  def start_position(self):
-    """Returns the starting position of the current range, inclusive."""
-    raise NotImplementedError
-
-  def stop_position(self):
-    """Returns the ending position of the current range, exclusive."""
-    raise NotImplementedError
-
-  def try_claim(self, position):  # pylint: disable=unused-argument
-    """Atomically determines if a record at a split point is within the range.
-
-    This method should be called **if and only if** the record is at a split
-    point. This method may modify the internal state of the ``RangeTracker`` by
-    updating the last-consumed position to ``position``.
-
-    ** Thread safety **
-
-    This method along with several other methods of this class may be invoked by
-    multiple threads, hence must be made thread-safe, e.g. by using a single
-    lock object.
-
-    Args:
-      position: starting position of a record being read by a source.
-
-    Returns:
-      ``True``, if the given position falls within the current range, returns
-      ``False`` otherwise.
-    """
-    raise NotImplementedError
-
-  def set_current_position(self, position):
-    """Updates the last-consumed position to the given position.
-
-    A source may invoke this method for records that do not start at split
-    points. This may modify the internal state of the ``RangeTracker``. If the
-    record starts at a split point, method ``try_claim()`` **must** be invoked
-    instead of this method.
-
-    Args:
-      position: starting position of a record being read by a source.
-    """
-    raise NotImplementedError
-
-  def position_at_fraction(self, fraction):
-    """Returns the position at the given fraction.
-
-    Given a fraction within the range [0.0, 1.0) this method will return the
-    position at the given fraction compared the the position range
-    [self.start_position, self.stop_position).
-
-    ** Thread safety **
-
-    This method along with several other methods of this class may be invoked by
-    multiple threads, hence must be made thread-safe, e.g. by using a single
-    lock object.
-
-    Args:
-      fraction: a float value within the range [0.0, 1.0).
-    Returns:
-      a position within the range [self.start_position, self.stop_position).
-    """
-    raise NotImplementedError
-
-  def try_split(self, position):
-    """Atomically splits the current range.
-
-    Determines a position to split the current range, split_position, based on
-    the given position. In most cases split_position and position will be the
-    same.
-
-    Splits the current range '[self.start_position, self.stop_position)'
-    into a "primary" part '[self.start_position, split_position)' and a
-    "residual" part '[split_position, self.stop_position)', assuming the
-    current last-consumed position is within
-    '[self.start_position, split_position)' (i.e., split_position has not been
-    consumed yet).
-
-    If successful, updates the current range to be the primary and returns a
-    tuple (split_position, split_fraction). split_fraction should be the
-    fraction of size of range '[self.start_position, split_position)' compared
-    to the original (before split) range
-    '[self.start_position, self.stop_position)'.
-
-    If the split_position has already been consumed, returns ``None``.
-
-    ** Thread safety **
-
-    This method along with several other methods of this class may be invoked by
-    multiple threads, hence must be made thread-safe, e.g. by using a single
-    lock object.
-
-    Args:
-      position: suggested position where the current range should try to
-                be split at.
-    Returns:
-      a tuple containing the split position and split fraction.
-    """
-    raise NotImplementedError
-
-  def fraction_consumed(self):
-    """Returns the approximate fraction of consumed positions in the source.
-
-    ** Thread safety **
-
-    This method along with several other methods of this class may be invoked by
-    multiple threads, hence must be made thread-safe, e.g. by using a single
-    lock object.
-
-    Returns:
-      the approximate fraction of positions that have been consumed by
-      successful 'try_split()' and  'report_current_position()'  calls, or
-      0.0 if no such calls have happened.
-    """
-    raise NotImplementedError
-
-
-class Sink(object):
-  """A resource that can be written to using the ``df.io.Write`` transform.
-
-  Here ``df`` stands for Dataflow Python code imported in following manner.
-  ``import google.cloud.dataflow as df``.
-
-  A parallel write to an ``iobase.Sink`` consists of three phases:
-
-  1. A sequential *initialization* phase (e.g., creating a temporary output
-     directory, etc.)
-  2. A parallel write phase where workers write *bundles* of records
-  3. A sequential *finalization* phase (e.g., committing the writes, merging
-     output files, etc.)
-
-  For exact definition of a Dataflow bundle please see
-  https://cloud.google.com/dataflow/faq.
-
-  Implementing a new sink requires extending two classes.
-
-  1. iobase.Sink
-
-  ``iobase.Sink`` is an immutable logical description of the location/resource
-  to write to. Depending on the type of sink, it may contain fields such as the
-  path to an output directory on a filesystem, a database table name,
-  etc. ``iobase.Sink`` provides methods for performing a write operation to the
-  sink described by it. To this end, implementors of an extension of
-  ``iobase.Sink`` must implement three methods:
-  ``initialize_write()``, ``open_writer()``, and ``finalize_write()``.
-
-  2. iobase.Writer
-
-  ``iobase.Writer`` is used to write a single bundle of records. An
-  ``iobase.Writer`` defines two methods: ``write()`` which writes a
-  single record from the bundle and ``close()`` which is called once
-  at the end of writing a bundle.
-
-  See also ``df.io.fileio.FileSink`` which provides a simpler API for writing
-  sinks that produce files.
-
-  **Execution of the Write transform**
-
-  ``initialize_write()`` and ``finalize_write()`` are conceptually called once:
-  at the beginning and end of a ``Write`` transform. However, implementors must
-  ensure that these methods are *idempotent*, as they may be called multiple
-  times on different machines in the case of failure/retry or for redundancy.
-
-  ``initialize_write()`` should perform any initialization that needs to be done
-  prior to writing to the sink. ``initialize_write()`` may return a result
-  (let's call this ``init_result``) that contains any parameters it wants to
-  pass on to its writers about the sink. For example, a sink that writes to a
-  file system may return an ``init_result`` that contains a dynamically
-  generated unique directory to which data should be written.
-
-  To perform writing of a bundle of elements, Dataflow execution engine will
-  create an ``iobase.Writer`` using the implementation of
-  ``iobase.Sink.open_writer()``. When invoking ``open_writer()`` execution
-  engine will provide the ``init_result`` returned by ``initialize_write()``
-  invocation as well as a *bundle id* (let's call this ``bundle_id``) that is
-  unique for each invocation of ``open_writer()``.
-
-  Execution engine will then invoke ``iobase.Writer.write()`` implementation for
-  each element that has to be written. Once all elements of a bundle are
-  written, execution engine will invoke ``iobase.Writer.close()`` implementation
-  which should return a result (let's call this ``write_result``) that contains
-  information that encodes the result of the write and, in most cases, some
-  encoding of the unique bundle id. For example, if each bundle is written to a
-  unique temporary file, ``close()`` method may return an object that contains
-  the temporary file name. After writing of all bundles is complete, execution
-  engine will invoke ``finalize_write()`` implementation. As parameters to this
-  invocation execution engine will provide ``init_result`` as well as an
-  iterable of ``write_result``.
-
-  The execution of a write transform can be illustrated using following pseudo
-  code (assume that the outer for loop happens in parallel across many
-  machines)::
-
-    init_result = sink.initialize_write()
-    write_results = []
-    for bundle in partition(pcoll):
-      writer = sink.open_writer(init_result, generate_bundle_id())
-      for elem in bundle:
-        writer.write(elem)
-      write_results.append(writer.close())
-    sink.finalize_write(init_result, write_results)
-
-
-  **init_result**
-
-  Methods of 'iobase.Sink' should agree on the 'init_result' type that will be
-  returned when initializing the sink. This type can be a client-defined object
-  or an existing type. The returned type must be picklable using Dataflow coder
-  ``coders.PickleCoder``. Returning an init_result is optional.
-
-  **bundle_id**
-
-  In order to ensure fault-tolerance, a bundle may be executed multiple times
-  (e.g., in the event of failure/retry or for redundancy). However, exactly one
-  of these executions will have its result passed to the
-  ``iobase.Sink.finalize_write()`` method. Each call to
-  ``iobase.Sink.open_writer()`` is passed a unique bundle id when it is called
-  by the ``WriteImpl`` transform, so even redundant or retried bundles will have
-  a unique way of identifying their output.
-
-  The bundle id should be used to guarantee that a bundle's output is unique.
-  This uniqueness guarantee is important; if a bundle is to be output to a file,
-  for example, the name of the file must be unique to avoid conflicts with other
-  writers. The bundle id should be encoded in the writer result returned by the
-  writer and subsequently used by the ``finalize_write()`` method to identify
-  the results of successful writes.
-
-  For example, consider the scenario where a Writer writes files containing
-  serialized records and the ``finalize_write()`` is to merge or rename these
-  output files. In this case, a writer may use its unique id to name its output
-  file (to avoid conflicts) and return the name of the file it wrote as its
-  writer result. The ``finalize_write()`` will then receive an ``Iterable`` of
-  output file names that it can then merge or rename using some bundle naming
-  scheme.
-
-  **write_result**
-
-  ``iobase.Writer.close()`` and ``finalize_write()`` implementations must agree
-  on type of the ``write_result`` object returned when invoking
-  ``iobase.Writer.close()``. This type can be a client-defined object or
-  an existing type. The returned type must be picklable using Dataflow coder
-  ``coders.PickleCoder``. Returning a ``write_result`` when
-  ``iobase.Writer.close()`` is invoked is optional but if unique
-  ``write_result`` objects are not returned, sink should, guarantee idempotency
-  when same bundle is written multiple times due to failure/retry or redundancy.
-
-
-  **More information**
-
-  For more information on creating new sinks please refer to the official
-  documentation at
-  ``https://cloud.google.com/dataflow/model/custom-io#creating-sinks``.
-  """
-
-  def initialize_write(self):
-    """Initializes the sink before writing begins.
-
-    Invoked before any data is written to the sink.
-
-
-    Please see documentation in ``iobase.Sink`` for an example.
-
-    Returns:
-      An object that contains any sink specific state generated by
-      initialization. This object will be passed to open_writer() and
-      finalize_write() methods.
-    """
-    raise NotImplementedError
-
-  def open_writer(self, init_result, uid):
-    """Opens a writer for writing a bundle of elements to the sink.
-
-    Args:
-      init_result: the result of initialize_write() invocation.
-      uid: a unique identifier generated by the system.
-    Returns:
-      an ``iobase.Writer`` that can be used to write a bundle of records to the
-      current sink.
-    """
-    raise NotImplementedError
-
-  def finalize_write(self, init_result, writer_results):
-    """Finalizes the sink after all data is written to it.
-
-    Given the result of initialization and an iterable of results from bundle
-    writes, performs finalization after writing and closes the sink. Called
-    after all bundle writes are complete.
-
-    The bundle write results that are passed to finalize are those returned by
-    bundles that completed successfully. Although bundles may have been run
-    multiple times (for fault-tolerance), only one writer result will be passed
-    to finalize for each bundle. An implementation of finalize should perform
-    clean up of any failed and successfully retried bundles.  Note that these
-    failed bundles will not have their writer result passed to finalize, so
-    finalize should be capable of locating any temporary/partial output written
-    by failed bundles.
-
-    If all retries of a bundle fails, the whole pipeline will fail *without*
-    finalize_write() being invoked.
-
-    A best practice is to make finalize atomic. If this is impossible given the
-    semantics of the sink, finalize should be idempotent, as it may be called
-    multiple times in the case of failure/retry or for redundancy.
-
-    Note that the iteration order of the writer results is not guaranteed to be
-    consistent if finalize is called multiple times.
-
-    Args:
-      init_result: the result of ``initialize_write()`` invocation.
-      writer_results: an iterable containing results of ``Writer.close()``
-        invocations. This will only contain results of successful writes, and
-        will only contain the result of a single successful write for a given
-        bundle.
-    """
-    raise NotImplementedError
-
-
-class Writer(object):
-  """Writes a bundle of elements from a ``PCollection`` to a sink.
-
-  A Writer  ``iobase.Writer.write()`` writes and elements to the sink while
-  ``iobase.Writer.close()`` is called after all elements in the bundle have been
-  written.
-
-  See ``iobase.Sink`` for more detailed documentation about the process of
-  writing to a sink.
-  """
-
-  def write(self, value):
-    """Writes a value to the sink using the current writer."""
-    raise NotImplementedError
-
-  def close(self):
-    """Closes the current writer.
-
-    Please see documentation in ``iobase.Sink`` for an example.
-
-    Returns:
-      An object representing the writes that were performed by the current
-      writer.
-    """
-    raise NotImplementedError
-
-
-class _NativeWrite(ptransform.PTransform):
-  """A PTransform for writing to a Dataflow native sink.
-
-  These are sinks that are implemented natively by the Dataflow service
-  and hence should not be updated by users. These sinks are processed
-  using a Dataflow native write transform.
-
-  Applying this transform results in a ``pvalue.PDone``.
-  """
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a Write transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, sink) or (sink).
-    """
-    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
-    super(_NativeWrite, self).__init__(label)
-    self.sink = sink
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    return pvalue.PDone(pcoll.pipeline)
-
-
-class Read(ptransform.PTransform):
-  """A transform that reads a PCollection."""
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a Read transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, source) or (source).
-    """
-    label, source = self.parse_label_and_arg(args, kwargs, 'source')
-    super(Read, self).__init__(label)
-    self.source = source
-
-  def apply(self, pbegin):
-    assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-    return pvalue.PCollection(self.pipeline)
-
-  def get_windowing(self, unused_inputs):
-    return core.Windowing(window.GlobalWindows())
-
-
-class Write(ptransform.PTransform):
-  """A ``PTransform`` that writes to a sink.
-
-  A sink should inherit ``iobase.Sink``. Such implementations are
-  handled using a composite transform that consists of three ``ParDo``s -
-  (1) a ``ParDo`` performing a global initialization (2) a ``ParDo`` performing
-  a parallel write and (3) a ``ParDo`` performing a global finalization. In the
-  case of an empty ``PCollection``, only the global initialization and
-  finalization will be performed. Currently only batch workflows support custom
-  sinks.
-
-  Example usage::
-
-      pcollection | df.io.Write(MySink())
-
-  This returns a ``pvalue.PValue`` object that represents the end of the
-  Pipeline.
-
-  The sink argument may also be a full PTransform, in which case it will be
-  applied directly.  This allows composite sink-like transforms (e.g. a sink
-  with some pre-processing DoFns) to be used the same as all other sinks.
-
-  This transform also supports sinks that inherit ``iobase.NativeSink``. These
-  are sinks that are implemented natively by the Dataflow service and hence
-  should not be updated by users. These sinks are processed using a Dataflow
-  native write transform.
-  """
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a Write transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, sink) or (sink).
-    """
-    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
-    super(Write, self).__init__(label)
-    self.sink = sink
-
-  def apply(self, pcoll):
-    from google.cloud.dataflow.io import iobase
-    if isinstance(self.sink, iobase.NativeSink):
-      # A native sink
-      return pcoll | _NativeWrite('native_write', self.sink)
-    elif isinstance(self.sink, iobase.Sink):
-      # A custom sink
-      return pcoll | WriteImpl(self.sink)
-    elif isinstance(self.sink, ptransform.PTransform):
-      # This allows "composite" sinks to be used like non-composite ones.
-      return pcoll | self.sink
-    else:
-      raise ValueError('A sink must inherit iobase.Sink, iobase.NativeSink, '
-                       'or be a PTransform. Received : %r', self.sink)
-
-
-class WriteImpl(ptransform.PTransform):
-  """Implements the writing of custom sinks."""
-
-  def __init__(self, sink):
-    super(WriteImpl, self).__init__()
-    self.sink = sink
-
-  def apply(self, pcoll):
-    do_once = pcoll.pipeline | core.Create('DoOnce', [None])
-    init_result_coll = do_once | core.Map(
-        'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
-    if getattr(self.sink, 'num_shards', 0):
-      min_shards = self.sink.num_shards
-      if min_shards == 1:
-        keyed_pcoll = pcoll | core.Map(lambda x: (None, x))
-      else:
-        keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(min_shards))
-      write_result_coll = (keyed_pcoll
-                           | core.WindowInto(window.GlobalWindows())
-                           | core.GroupByKey()
-                           | core.Map('write_bundles',
-                                      _write_keyed_bundle, self.sink,
-                                      AsSingleton(init_result_coll)))
-    else:
-      min_shards = 1
-      write_result_coll = pcoll | core.ParDo('write_bundles',
-                                             _WriteBundleDoFn(), self.sink,
-                                             AsSingleton(init_result_coll))
-    return do_once | core.FlatMap(
-        'finalize_write',
-        _finalize_write,
-        self.sink,
-        AsSingleton(init_result_coll),
-        AsIter(write_result_coll),
-        min_shards)
-
-
-class _WriteBundleDoFn(core.DoFn):
-  """A DoFn for writing elements to an iobase.Writer.
-
-  Opens a writer at the first element and closes the writer at finish_bundle().
-  """
-
-  def __init__(self):
-    self.writer = None
-
-  def process(self, context, sink, init_result):
-    if self.writer is None:
-      self.writer = sink.open_writer(init_result, str(uuid.uuid4()))
-    self.writer.write(context.element)
-
-  def finish_bundle(self, context, *args, **kwargs):
-    if self.writer is not None:
-      yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
-
-
-def _write_keyed_bundle(bundle, sink, init_result):
-  writer = sink.open_writer(init_result, str(uuid.uuid4()))
-  for element in bundle[1]:  # values
-    writer.write(element)
-  return window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)
-
-
-def _finalize_write(_, sink, init_result, write_results, min_shards):
-  write_results = list(write_results)
-  extra_shards = []
-  if len(write_results) < min_shards:
-    logging.debug(
-        'Creating %s empty shard(s).', min_shards - len(write_results))
-    for _ in range(min_shards - len(write_results)):
-      writer = sink.open_writer(init_result, str(uuid.uuid4()))
-      extra_shards.append(writer.close())
-  outputs = sink.finalize_write(init_result, write_results + extra_shards)
-  if outputs:
-    return (window.TimestampedValue(v, window.MAX_TIMESTAMP) for v in outputs)
-
-
-class _RoundRobinKeyFn(core.DoFn):
-
-  def __init__(self, count):
-    self.count = count
-
-  def start_bundle(self, context):
-    self.counter = random.randint(0, self.count - 1)
-
-  def process(self, context):
-    self.counter += 1
-    if self.counter >= self.count:
-      self.counter -= self.count
-    yield self.counter, context.element

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/io/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/io/pubsub.py b/sdks/python/google/cloud/dataflow/io/pubsub.py
deleted file mode 100644
index 88aa7f5..0000000
--- a/sdks/python/google/cloud/dataflow/io/pubsub.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Google Cloud PubSub sources and sinks.
-
-Cloud Pub/Sub sources and sinks are currently supported only in streaming
-pipelines, during remote execution.
-"""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow.io import iobase
-
-
-class PubSubSource(iobase.NativeSource):
-  """Source for reading from a given Cloud Pub/Sub topic.
-
-  Attributes:
-    topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
-    subscription: Optional existing Cloud Pub/Sub subscription to use in the
-      form "projects/<project>/subscriptions/<subscription>".
-    id_label: The attribute on incoming Pub/Sub messages to use as a unique
-      record identifier.  When specified, the value of this attribute (which can
-      be any string that uniquely identifies the record) will be used for
-      deduplication of messages.  If not provided, Dataflow cannot guarantee
-      that no duplicate data will be delivered on the Pub/Sub stream. In this
-      case, deduplication of the stream will be strictly best effort.
-    coder: The Coder to use for decoding incoming Pub/Sub messages.
-  """
-
-  def __init__(self, topic, subscription=None, id_label=None,
-               coder=coders.StrUtf8Coder()):
-    self.topic = topic
-    self.subscription = subscription
-    self.id_label = id_label
-    self.coder = coder
-
-  @property
-  def format(self):
-    """Source format name required for remote execution."""
-    return 'pubsub'
-
-  def reader(self):
-    raise NotImplementedError(
-        'PubSubSource is not supported in local execution.')
-
-
-class PubSubSink(iobase.NativeSink):
-  """Sink for writing to a given Cloud Pub/Sub topic."""
-
-  def __init__(self, topic, coder=coders.StrUtf8Coder()):
-    self.topic = topic
-    self.coder = coder
-
-  @property
-  def format(self):
-    """Sink format name required for remote execution."""
-    return 'pubsub'
-
-  def writer(self):
-    raise NotImplementedError(
-        'PubSubSink is not supported in local execution.')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/io/range_trackers.py b/sdks/python/google/cloud/dataflow/io/range_trackers.py
deleted file mode 100644
index 2cdcd5b..0000000
--- a/sdks/python/google/cloud/dataflow/io/range_trackers.py
+++ /dev/null
@@ -1,270 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""iobase.RangeTracker implementations provided with Dataflow SDK.
-"""
-
-import logging
-import math
-import threading
-
-from google.cloud.dataflow.io import iobase
-
-
-class OffsetRangeTracker(iobase.RangeTracker):
-  """A 'RangeTracker' for non-negative positions of type 'long'."""
-
-  # Offset corresponding to infinity. This can only be used as the upper-bound
-  # of a range, and indicates reading all of the records until the end without
-  # specifying exactly what the end is.
-  # Infinite ranges cannot be split because it is impossible to estimate
-  # progress within them.
-  OFFSET_INFINITY = float('inf')
-
-  def __init__(self, start, end):
-    super(OffsetRangeTracker, self).__init__()
-    self._start_offset = start
-    self._stop_offset = end
-    self._last_record_start = -1
-    self._offset_of_last_split_point = -1
-    self._lock = threading.Lock()
-
-  def start_position(self):
-    return self._start_offset
-
-  def stop_position(self):
-    return self._stop_offset
-
-  @property
-  def last_record_start(self):
-    return self._last_record_start
-
-  def _validate_record_start(self, record_start, split_point):
-    # This function must only be called under the lock self.lock.
-    if not self._lock.locked():
-      raise ValueError(
-          'This function must only be called under the lock self.lock.')
-
-    if record_start < self._last_record_start:
-      raise ValueError(
-          'Trying to return a record [starting at %d] which is before the '
-          'last-returned record [starting at %d]' %
-          (record_start, self._last_record_start))
-
-    if split_point:
-      if (self._offset_of_last_split_point != -1 and
-          record_start == self._offset_of_last_split_point):
-        raise ValueError(
-            'Record at a split point has same offset as the previous split '
-            'point: %d' % record_start)
-    elif self._last_record_start == -1:
-      raise ValueError(
-          'The first record [starting at %d] must be at a split point' %
-          record_start)
-
-    if (split_point and self._offset_of_last_split_point is not -1 and
-        record_start is self._offset_of_last_split_point):
-      raise ValueError(
-          'Record at a split point has same offset as the previous split '
-          'point: %d' % record_start)
-
-    if not split_point and self._last_record_start == -1:
-      raise ValueError(
-          'The first record [starting at %d] must be at a split point' %
-          record_start)
-
-  def try_claim(self, record_start):
-    with self._lock:
-      self._validate_record_start(record_start, True)
-      if record_start >= self.stop_position():
-        return False
-      self._offset_of_last_split_point = record_start
-      self._last_record_start = record_start
-      return True
-
-  def set_current_position(self, record_start):
-    with self._lock:
-      self._validate_record_start(record_start, False)
-      self._last_record_start = record_start
-
-  def try_split(self, split_offset):
-    with self._lock:
-      if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
-        logging.debug('refusing to split %r at %d: stop position unspecified',
-                      self, split_offset)
-        return
-      if self._last_record_start == -1:
-        logging.debug('Refusing to split %r at %d: unstarted', self,
-                      split_offset)
-        return
-
-      if split_offset <= self._last_record_start:
-        logging.debug(
-            'Refusing to split %r at %d: already past proposed stop offset',
-            self, split_offset)
-        return
-      if (split_offset < self.start_position()
-          or split_offset >= self.stop_position()):
-        logging.debug(
-            'Refusing to split %r at %d: proposed split position out of range',
-            self, split_offset)
-        return
-
-      logging.debug('Agreeing to split %r at %d', self, split_offset)
-      self._stop_offset = split_offset
-
-      split_fraction = (float(split_offset - self._start_offset) / (
-          self._stop_offset - self._start_offset))
-
-      return self._stop_offset, split_fraction
-
-  def fraction_consumed(self):
-    with self._lock:
-      fraction = ((1.0 * (self._last_record_start - self.start_position()) /
-                   (self.stop_position() - self.start_position())) if
-                  self.stop_position() != self.start_position() else 0.0)
-
-      # self.last_record_start may become larger than self.end_offset when
-      # reading the records since any record that starts before the first 'split
-      # point' at or after the defined 'stop offset' is considered to be within
-      # the range of the OffsetRangeTracker. Hence fraction could be > 1.
-      # self.last_record_start is initialized to -1, hence fraction may be < 0.
-      # Bounding the to range [0, 1].
-      return max(0.0, min(1.0, fraction))
-
-  def position_at_fraction(self, fraction):
-    if self.stop_position() == OffsetRangeTracker.OFFSET_INFINITY:
-      raise Exception(
-          'get_position_for_fraction_consumed is not applicable for an '
-          'unbounded range')
-    return (math.ceil(self.start_position() + fraction * (
-        self.stop_position() - self.start_position())))
-
-
-class GroupedShuffleRangeTracker(iobase.RangeTracker):
-  """A 'RangeTracker' for positions used by'GroupedShuffleReader'.
-
-  These positions roughly correspond to hashes of keys. In case of hash
-  collisions, multiple groups can have the same position. In that case, the
-  first group at a particular position is considered a split point (because
-  it is the first to be returned when reading a position range starting at this
-  position), others are not.
-  """
-
-  def __init__(self, decoded_start_pos, decoded_stop_pos):
-    super(GroupedShuffleRangeTracker, self).__init__()
-    self._decoded_start_pos = decoded_start_pos
-    self._decoded_stop_pos = decoded_stop_pos
-    self._decoded_last_group_start = None
-    self._last_group_was_at_a_split_point = False
-    self._lock = threading.Lock()
-
-  def start_position(self):
-    return self._decoded_start_pos
-
-  def stop_position(self):
-    return self._decoded_stop_pos
-
-  def last_group_start(self):
-    return self._decoded_last_group_start
-
-  def _validate_decoded_group_start(self, decoded_group_start, split_point):
-    if self.start_position() and decoded_group_start < self.start_position():
-      raise ValueError('Trying to return record at %r which is before the'
-                       ' starting position at %r' %
-                       (decoded_group_start, self.start_position()))
-
-    if (self.last_group_start() and
-        decoded_group_start < self.last_group_start()):
-      raise ValueError('Trying to return group at %r which is before the'
-                       ' last-returned group at %r' %
-                       (decoded_group_start, self.last_group_start()))
-    if (split_point and self.last_group_start() and
-        self.last_group_start() == decoded_group_start):
-      raise ValueError('Trying to return a group at a split point with '
-                       'same position as the previous group: both at %r, '
-                       'last group was %sat a split point.' %
-                       (decoded_group_start,
-                        ('' if self._last_group_was_at_a_split_point
-                         else 'not ')))
-    if not split_point:
-      if self.last_group_start() is None:
-        raise ValueError('The first group [at %r] must be at a split point' %
-                         decoded_group_start)
-      if self.last_group_start() != decoded_group_start:
-        # This case is not a violation of general RangeTracker semantics, but it
-        # is contrary to how GroupingShuffleReader in particular works. Hitting
-        # it would mean it's behaving unexpectedly.
-        raise ValueError('Trying to return a group not at a split point, but '
-                         'with a different position than the previous group: '
-                         'last group was %r at %r, current at a %s split'
-                         ' point.' %
-                         (self.last_group_start()
-                          , decoded_group_start
-                          , ('' if self._last_group_was_at_a_split_point
-                             else 'non-')))
-
-  def try_claim(self, decoded_group_start):
-    with self._lock:
-      self._validate_decoded_group_start(decoded_group_start, True)
-      if (self.stop_position()
-          and decoded_group_start >= self.stop_position()):
-        return False
-
-      self._decoded_last_group_start = decoded_group_start
-      self._last_group_was_at_a_split_point = True
-      return True
-
-  def set_current_position(self, decoded_group_start):
-    with self._lock:
-      self._validate_decoded_group_start(decoded_group_start, False)
-      self._decoded_last_group_start = decoded_group_start
-      self._last_group_was_at_a_split_point = False
-
-  def try_split(self, decoded_split_position):
-    with self._lock:
-      if self.last_group_start() is None:
-        logging.info('Refusing to split %r at %r: unstarted'
-                     , self, decoded_split_position)
-        return
-
-      if decoded_split_position <= self.last_group_start():
-        logging.info('Refusing to split %r at %r: already past proposed split '
-                     'position'
-                     , self, decoded_split_position)
-        return
-
-      if ((self.stop_position()
-           and decoded_split_position >= self.stop_position())
-          or (self.start_position()
-              and decoded_split_position <= self.start_position())):
-        logging.error('Refusing to split %r at %r: proposed split position out '
-                      'of range', self, decoded_split_position)
-        return
-
-      logging.debug('Agreeing to split %r at %r'
-                    , self, decoded_split_position)
-      self._decoded_stop_pos = decoded_split_position
-
-      # Since GroupedShuffleRangeTracker cannot determine relative sizes of the
-      # two splits, returning 0.5 as the fraction below so that the framework
-      # assumes the splits to be of the same size.
-      return self._decoded_stop_pos, 0.5
-
-  def fraction_consumed(self):
-    # GroupingShuffle sources have special support on the service and the
-    # service will estimate progress from positions for us.
-    raise RuntimeError('GroupedShuffleRangeTracker does not measure fraction'
-                       ' consumed due to positions being opaque strings'
-                       ' that are interpretted by the service')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/io/range_trackers_test.py b/sdks/python/google/cloud/dataflow/io/range_trackers_test.py
deleted file mode 100644
index 709d594..0000000
--- a/sdks/python/google/cloud/dataflow/io/range_trackers_test.py
+++ /dev/null
@@ -1,318 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the range_trackers module."""
-
-import array
-import copy
-import logging
-import unittest
-
-
-from google.cloud.dataflow.io import range_trackers
-
-
-class OffsetRangeTrackerTest(unittest.TestCase):
-
-  def test_try_return_record_simple_sparse(self):
-    tracker = range_trackers.OffsetRangeTracker(100, 200)
-    self.assertTrue(tracker.try_claim(110))
-    self.assertTrue(tracker.try_claim(140))
-    self.assertTrue(tracker.try_claim(183))
-    self.assertFalse(tracker.try_claim(210))
-
-  def test_try_return_record_simple_dense(self):
-    tracker = range_trackers.OffsetRangeTracker(3, 6)
-    self.assertTrue(tracker.try_claim(3))
-    self.assertTrue(tracker.try_claim(4))
-    self.assertTrue(tracker.try_claim(5))
-    self.assertFalse(tracker.try_claim(6))
-
-  def test_try_return_record_continuous_until_split_point(self):
-    tracker = range_trackers.OffsetRangeTracker(9, 18)
-    # Return records with gaps of 2; every 3rd record is a split point.
-    self.assertTrue(tracker.try_claim(10))
-    tracker.set_current_position(12)
-    tracker.set_current_position(14)
-    self.assertTrue(tracker.try_claim(16))
-    # Out of range, but not a split point...
-    tracker.set_current_position(18)
-    tracker.set_current_position(20)
-    # Out of range AND a split point.
-    self.assertFalse(tracker.try_claim(22))
-
-  def test_split_at_offset_fails_if_unstarted(self):
-    tracker = range_trackers.OffsetRangeTracker(100, 200)
-    self.assertFalse(tracker.try_split(150))
-
-  def test_split_at_offset(self):
-    tracker = range_trackers.OffsetRangeTracker(100, 200)
-    self.assertTrue(tracker.try_claim(110))
-    # Example positions we shouldn't split at, when last record starts at 110:
-    self.assertFalse(tracker.try_split(109))
-    self.assertFalse(tracker.try_split(110))
-    self.assertFalse(tracker.try_split(200))
-    self.assertFalse(tracker.try_split(210))
-    # Example positions we *should* split at:
-    self.assertTrue(copy.copy(tracker).try_split(111))
-    self.assertTrue(copy.copy(tracker).try_split(129))
-    self.assertTrue(copy.copy(tracker).try_split(130))
-    self.assertTrue(copy.copy(tracker).try_split(131))
-    self.assertTrue(copy.copy(tracker).try_split(150))
-    self.assertTrue(copy.copy(tracker).try_split(199))
-
-    # If we split at 170 and then at 150:
-    self.assertTrue(tracker.try_split(170))
-    self.assertTrue(tracker.try_split(150))
-    # Should be able  to return a record starting before the new stop offset.
-    # Returning records starting at the same offset is ok.
-    self.assertTrue(copy.copy(tracker).try_claim(135))
-    self.assertTrue(copy.copy(tracker).try_claim(135))
-    # Should be able to return a record starting right before the new stop
-    # offset.
-    self.assertTrue(copy.copy(tracker).try_claim(149))
-    # Should not be able to return a record starting at or after the new stop
-    # offset.
-    self.assertFalse(tracker.try_claim(150))
-    self.assertFalse(tracker.try_claim(151))
-    # Should accept non-splitpoint records starting after stop offset.
-    tracker.set_current_position(135)
-    tracker.set_current_position(152)
-    tracker.set_current_position(160)
-    tracker.set_current_position(171)
-
-  def test_get_position_for_fraction_dense(self):
-    # Represents positions 3, 4, 5.
-    tracker = range_trackers.OffsetRangeTracker(3, 6)
-    # [3, 3) represents 0.0 of [3, 6)
-    self.assertEqual(3, tracker.position_at_fraction(0.0))
-    # [3, 4) represents up to 1/3 of [3, 6)
-    self.assertEqual(4, tracker.position_at_fraction(1.0 / 6))
-    self.assertEqual(4, tracker.position_at_fraction(0.333))
-    # [3, 5) represents up to 2/3 of [3, 6)
-    self.assertEqual(5, tracker.position_at_fraction(0.334))
-    self.assertEqual(5, tracker.position_at_fraction(0.666))
-    # Any fraction consumed over 2/3 means the whole [3, 6) has been consumed.
-    self.assertEqual(6, tracker.position_at_fraction(0.667))
-
-  def test_get_fraction_consumed_dense(self):
-    tracker = range_trackers.OffsetRangeTracker(3, 6)
-    self.assertEqual(0, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(3))
-    self.assertEqual(0.0, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(4))
-    self.assertEqual(1.0 / 3, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(5))
-    self.assertEqual(2.0 / 3, tracker.fraction_consumed())
-    tracker.set_current_position(6)
-    self.assertEqual(1.0, tracker.fraction_consumed())
-    tracker.set_current_position(7)
-    self.assertFalse(tracker.try_claim(7))
-
-  def test_get_fraction_consumed_sparse(self):
-    tracker = range_trackers.OffsetRangeTracker(100, 200)
-    self.assertEqual(0, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(110))
-    # Consumed positions through 110 = total 10 positions of 100 done.
-    self.assertEqual(0.10, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(150))
-    self.assertEqual(0.50, tracker.fraction_consumed())
-    self.assertTrue(tracker.try_claim(195))
-    self.assertEqual(0.95, tracker.fraction_consumed())
-
-  def test_everything_with_unbounded_range(self):
-    tracker = range_trackers.OffsetRangeTracker(
-        100,
-        range_trackers.OffsetRangeTracker.OFFSET_INFINITY)
-    self.assertTrue(tracker.try_claim(150))
-    self.assertTrue(tracker.try_claim(250))
-    # get_position_for_fraction_consumed should fail for an unbounded range
-    with self.assertRaises(Exception):
-      tracker.position_at_fraction(0.5)
-
-  def test_try_return_first_record_not_split_point(self):
-    with self.assertRaises(Exception):
-      range_trackers.OffsetRangeTracker(100, 200).set_current_position(120)
-
-  def test_try_return_record_non_monotonic(self):
-    tracker = range_trackers.OffsetRangeTracker(100, 200)
-    self.assertTrue(tracker.try_claim(120))
-    with self.assertRaises(Exception):
-      tracker.try_claim(110)
-
-
-class GroupedShuffleRangeTrackerTest(unittest.TestCase):
-
-  def bytes_to_position(self, bytes_array):
-    return array.array('B', bytes_array).tostring()
-
-  def test_try_return_record_in_infinite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker('', '')
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-
-  def test_try_return_record_finite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([4, 255, 255, 255])))
-    # Should fail for positions that are lexicographically equal to or larger
-    # than the defined stop position.
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([5, 0, 0])))
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([5, 0, 1])))
-    self.assertFalse(copy.copy(tracker).try_claim(
-        self.bytes_to_position([6, 0, 0])))
-
-  def test_try_return_record_with_non_split_point(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([1, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 3]))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 3]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 5])))
-    tracker.set_current_position(self.bytes_to_position([1, 2, 5]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 6, 8, 10])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([4, 255, 255, 255])))
-
-  def test_first_record_non_split_point(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    with self.assertRaises(ValueError):
-      tracker.set_current_position(self.bytes_to_position([3, 4, 5]))
-
-  def test_non_split_point_record_with_different_position(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5])))
-    with self.assertRaises(ValueError):
-      tracker.set_current_position(self.bytes_to_position([3, 4, 6]))
-
-  def test_try_return_record_before_start(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([1, 2, 3]))
-
-  def test_try_return_non_monotonic(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 5])))
-    self.assertTrue(tracker.try_claim(self.bytes_to_position([3, 4, 6])))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([3, 2, 1]))
-
-  def test_try_return_identical_positions(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([3, 0, 0]), self.bytes_to_position([5, 0, 0]))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 4, 5])))
-    with self.assertRaises(ValueError):
-      tracker.try_claim(self.bytes_to_position([3, 4, 5]))
-
-  def test_try_split_at_position_infinite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker('', '')
-    # Should fail before first record is returned.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-
-    # Should now succeed.
-    self.assertIsNotNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    # Should not split at same or larger position.
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6, 7])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([4, 5, 6, 7])))
-
-    # Should split at smaller position.
-    self.assertIsNotNone(tracker.try_split(
-        self.bytes_to_position([3, 2, 1])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([2, 3, 4])))
-
-    # Should not split at a position we're already past.
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([2, 3, 4])))
-    self.assertIsNone(tracker.try_split(
-        self.bytes_to_position([2, 3, 3])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 2, 0])))
-    self.assertFalse(tracker.try_claim(
-        self.bytes_to_position([3, 2, 1])))
-
-  def test_try_test_split_at_position_finite_range(self):
-    tracker = range_trackers.GroupedShuffleRangeTracker(
-        self.bytes_to_position([0, 0, 0]),
-        self.bytes_to_position([10, 20, 30]))
-    # Should fail before first record is returned.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([0, 0, 0])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([1, 2, 3])))
-
-    # Should now succeed.
-    self.assertTrue(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    # Should not split at same or larger position.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([3, 4, 5, 6, 7])))
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([4, 5, 6, 7])))
-
-    # Should split at smaller position.
-    self.assertTrue(tracker.try_split(
-        self.bytes_to_position([3, 2, 1])))
-    # But not at a position at or before last returned record.
-    self.assertFalse(tracker.try_split(
-        self.bytes_to_position([1, 2, 3])))
-
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([2, 3, 4])))
-    self.assertTrue(tracker.try_claim(
-        self.bytes_to_position([3, 2, 0])))
-    self.assertFalse(tracker.try_claim(
-        self.bytes_to_position([3, 2, 1])))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/io/sources_test.py b/sdks/python/google/cloud/dataflow/io/sources_test.py
deleted file mode 100644
index 512dc1a..0000000
--- a/sdks/python/google/cloud/dataflow/io/sources_test.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the sources framework."""
-
-import logging
-import tempfile
-import unittest
-
-import google.cloud.dataflow as df
-
-from google.cloud.dataflow.io import iobase
-from google.cloud.dataflow.transforms.util import assert_that
-from google.cloud.dataflow.transforms.util import equal_to
-
-
-class LineSource(iobase.BoundedSource):
-  """A simple source that reads lines from a given file."""
-
-  def __init__(self, file_name):
-    self._file_name = file_name
-
-  def read(self, _):
-    with open(self._file_name) as f:
-      for line in f:
-        yield line.rstrip('\n')
-
-
-class SourcesTest(unittest.TestCase):
-
-  def _create_temp_file(self, contents):
-    with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
-      return f.name
-
-  def test_read_from_source(self):
-    file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
-
-    source = LineSource(file_name)
-    result = [line for line in source.read(None)]
-
-    self.assertItemsEqual(['aaaa', 'bbbb', 'cccc', 'dddd'], result)
-
-  def test_run_direct(self):
-    file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
-    pipeline = df.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Read(LineSource(file_name))
-    assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
-
-    pipeline.run()
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()