You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/23 20:54:30 UTC

[1/6] incubator-beam git commit: Move ConcatSource into its own module.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 0fa9c4be6 -> 753cc9c2e


Move ConcatSource into its own module.

Also added some more tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d189f83e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d189f83e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d189f83e

Branch: refs/heads/python-sdk
Commit: d189f83e4fb89e7cdc36e267bdf4d30c35bb03e8
Parents: 0ffe7cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Sep 22 15:34:04 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 264 +++++++++++++++++++
 .../python/apache_beam/io/concat_source_test.py | 222 ++++++++++++++++
 sdks/python/apache_beam/io/filebasedsource.py   |   3 +-
 .../apache_beam/io/filebasedsource_test.py      |   2 +-
 sdks/python/apache_beam/io/iobase.py            | 234 ----------------
 sdks/python/apache_beam/io/source_test_utils.py |   2 +-
 sdks/python/apache_beam/io/sources_test.py      | 100 -------
 7 files changed, 490 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
new file mode 100644
index 0000000..f2bd238
--- /dev/null
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -0,0 +1,264 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Concat Source, which reads the union of several other sources.
+"""
+
+import bisect
+import threading
+
+from apache_beam.io import iobase
+
+
+class ConcatSource(iobase.BoundedSource):
+  """A ``BoundedSource`` that can group a set of ``BoundedSources``.
+
+  Primarily for internal use, use the ``apache_beam.Flatten`` transform
+  to create the union of several reads.
+  """
+
+  def __init__(self, sources):
+    self._source_bundles = [source if isinstance(source, iobase.SourceBundle)
+                            else iobase.SourceBundle(None, source, None, None)
+                            for source in sources]
+
+  @property
+  def sources(self):
+    return [s.source for s in self._source_bundles]
+
+  def estimate_size(self):
+    return sum(s.source.estimate_size() for s in self._source_bundles)
+
+  def split(
+      self, desired_bundle_size=None, start_position=None, stop_position=None):
+    if start_position or stop_position:
+      raise ValueError(
+          'Multi-level initial splitting is not supported. Expected start and '
+          'stop positions to be None. Received %r and %r respectively.' %
+          (start_position, stop_position))
+
+    for source in self._source_bundles:
+      # We assume all sub-sources to produce bundles that specify weight using
+      # the same unit. For example, all sub-sources may specify the size in
+      # bytes as their weight.
+      for bundle in source.source.split(
+          desired_bundle_size, source.start_position, source.stop_position):
+        yield bundle
+
+  def get_range_tracker(self, start_position=None, stop_position=None):
+    if start_position is None:
+      start_position = (0, None)
+    if stop_position is None:
+      stop_position = (len(self._source_bundles), None)
+    return ConcatRangeTracker(
+        start_position, stop_position, self._source_bundles)
+
+  def read(self, range_tracker):
+    start_source, _ = range_tracker.start_position()
+    stop_source, stop_pos = range_tracker.stop_position()
+    if stop_pos is not None:
+      stop_source += 1
+    for source_ix in range(start_source, stop_source):
+      if not range_tracker.try_claim((source_ix, None)):
+        break
+      for record in self._source_bundles[source_ix].source.read(
+          range_tracker.sub_range_tracker(source_ix)):
+        yield record
+
+  def default_output_coder(self):
+    if self._source_bundles:
+      # Getting coder from the first sub-sources. This assumes all sub-sources
+      # to produce the same coder.
+      return self._source_bundles[0].source.default_output_coder()
+    else:
+      # Defaulting to PickleCoder.
+      return super(ConcatSource, self).default_output_coder()
+
+
+class ConcatRangeTracker(iobase.RangeTracker):
+  """Range tracker for ConcatSource"""
+
+  def __init__(self, start, end, source_bundles):
+    """Initializes ``ConcatRangeTracker``
+
+    Args:
+      start: start position, a tuple of (source_index, source_position)
+      end: end position, a tuple of (source_index, source_position)
+      source_bundles: the list of source bundles in the ConcatSource
+    """
+    super(ConcatRangeTracker, self).__init__()
+    self._start = start
+    self._end = end
+    self._source_bundles = source_bundles
+    self._lock = threading.RLock()
+    # Lazily-initialized list of RangeTrackers corresponding to each source.
+    self._range_trackers = [None] * len(source_bundles)
+    # The currently-being-iterated-over (and latest claimed) source.
+    self._claimed_source_ix = self._start[0]
+    # Now compute cumulative progress through the sources for converting
+    # between global fractions and fractions within specific sources.
+    # TODO(robertwb): Implement fraction-at-position to properly scale
+    # partial start and end sources.
+    # Note, however, that in practice splits are typically on source
+    # boundaries anyways.
+    last = end[0] if end[1] is None else end[0] + 1
+    self._cumulative_weights = (
+        [0] * start[0]
+        + self._compute_cumulative_weights(source_bundles[start[0]:last])
+        + [1] * (len(source_bundles) - last - start[0]))
+
+  @staticmethod
+  def _compute_cumulative_weights(source_bundles):
+    # Two adjacent sources must differ so that they can be uniquely
+    # identified by a single global fraction.  Let min_diff be the
+    # smallest allowable difference between sources.
+    min_diff = 1e-5
+    # For the computation below, we need weights for all sources.
+    # Substitute average weights for those whose weights are
+    # unspecified (or 1.0 for everything if none are known).
+    known = [s.weight for s in source_bundles if s.weight is not None]
+    avg = sum(known) / len(known) if known else 1.0
+    weights = [s.weight or avg for s in source_bundles]
+
+    # Now compute running totals of the percent done upon reaching
+    # each source, with respect to the start and end positions.
+    # E.g. if the weights were [100, 20, 3] we would produce
+    # [0.0, 100/123, 120/123, 1.0]
+    total = float(sum(weights))
+    running_total = [0]
+    for w in weights:
+      running_total.append(
+          max(min_diff, min(1, running_total[-1] + w / total)))
+    running_total[-1] = 1  # In case of rounding error.
+    # There are issues if, due to rouding error or greatly differing sizes,
+    # two adjacent running total weights are equal. Normalize this things so
+    # that this never happens.
+    for k in range(1, len(running_total)):
+      if running_total[k] == running_total[k - 1]:
+        for j in range(k):
+          running_total[j] *= (1 - min_diff)
+    return running_total
+
+  def start_position(self):
+    return self._start
+
+  def stop_position(self):
+    return self._end
+
+  def try_claim(self, pos):
+    source_ix, source_pos = pos
+    with self._lock:
+      if source_ix > self._end[0]:
+        return False
+      elif source_ix == self._end[0] and self._end[1] is None:
+        return False
+      else:
+        assert source_ix >= self._claimed_source_ix
+        self._claimed_source_ix = source_ix
+        if source_pos is None:
+          return True
+        else:
+          return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+  def try_split(self, pos):
+    source_ix, source_pos = pos
+    with self._lock:
+      if source_ix < self._claimed_source_ix:
+        # Already claimed.
+        return None
+      elif source_ix > self._end[0]:
+        # After end.
+        return None
+      elif source_ix == self._end[0] and self._end[1] is None:
+        # At/after end.
+        return None
+      else:
+        if source_ix > self._claimed_source_ix:
+          # Prefer to split on even boundary.
+          split_pos = None
+          ratio = self._cumulative_weights[source_ix]
+        else:
+          # Split the current subsource.
+          split = self.sub_range_tracker(source_ix).try_split(
+              source_pos)
+          if not split:
+            return None
+          split_pos, frac = split
+          ratio = self.local_to_global(source_ix, frac)
+
+        self._end = source_ix, split_pos
+        self._cumulative_weights = [min(w / ratio, 1)
+                                    for w in self._cumulative_weights]
+        return (source_ix, split_pos), ratio
+
+  def set_current_position(self, pos):
+    raise NotImplementedError('Should only be called on sub-trackers')
+
+  def position_at_fraction(self, fraction):
+    source_ix, source_frac = self.global_to_local(fraction)
+    last = self._end[0] if self._end[1] is None else self._end[0] + 1
+    if source_ix == last:
+      return (source_ix, None)
+    else:
+      return (source_ix,
+              self.sub_range_tracker(source_ix).position_at_fraction(
+                  source_frac))
+
+  def fraction_consumed(self):
+    with self._lock:
+      return self.local_to_global(
+          self._claimed_source_ix,
+          self.sub_range_tracker(self._claimed_source_ix)
+          .fraction_consumed())
+
+  def local_to_global(self, source_ix, source_frac):
+    cw = self._cumulative_weights
+    # The global fraction is the fraction to source_ix plus some portion of
+    # the way towards the next source.
+    return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+  def global_to_local(self, frac):
+    if frac == 1:
+      last = self._end[0] if self._end[1] is None else self._end[0] + 1
+      return (last, None)
+    else:
+      cw = self._cumulative_weights
+      # Find the last source that starts at or before frac.
+      source_ix = bisect.bisect(cw, frac) - 1
+      # Return this source, converting what's left of frac after starting
+      # this source into a value in [0.0, 1.0) representing how far we are
+      # towards the next source.
+      return (source_ix,
+              (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+  def sub_range_tracker(self, source_ix):
+    assert self._start[0] <= source_ix <= self._end[0]
+    if self._range_trackers[source_ix] is None:
+      with self._lock:
+        if self._range_trackers[source_ix] is None:
+          source = self._source_bundles[source_ix]
+          if source_ix == self._start[0] and self._start[1] is not None:
+            start = self._start[1]
+          else:
+            start = source.start_position
+          if source_ix == self._end[0] and self._end[1] is not None:
+            stop = self._end[1]
+          else:
+            stop = source.stop_position
+          self._range_trackers[source_ix] = source.source.get_range_tracker(
+              start, stop)
+    return self._range_trackers[source_ix]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
new file mode 100644
index 0000000..828bdb0
--- /dev/null
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the sources framework."""
+
+import logging
+import unittest
+
+import apache_beam as beam
+
+from apache_beam.io import iobase
+from apache_beam.io import range_trackers
+from apache_beam.io import source_test_utils
+from apache_beam.io.concat_source import ConcatSource
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+
+
+class RangeSource(iobase.BoundedSource):
+
+  def __init__(self, start, end, split_freq=1):
+    assert start <= end
+    self._start = start
+    self._end = end
+    self._split_freq = split_freq
+
+  def _normalize(self, start_position, end_position):
+    return (self._start if start_position is None else start_position,
+            self._end if end_position is None else end_position)
+
+  def _round_up(self, index):
+    """Rounds up to the nearest mulitple of split_freq."""
+    return index - index % -self._split_freq
+
+  def estimate_size(self):
+    return self._end - self._start
+
+  def split(self, desired_bundle_size, start_position=None, end_position=None):
+    start, end = self._normalize(start_position, end_position)
+    for sub_start in range(start, end, desired_bundle_size):
+      sub_end = min(self._end, sub_start + desired_bundle_size)
+      yield iobase.SourceBundle(
+          sub_end - sub_start,
+          RangeSource(sub_start, sub_end, self._split_freq),
+          None, None)
+
+  def get_range_tracker(self, start_position, end_position):
+    start, end = self._normalize(start_position, end_position)
+    return range_trackers.OffsetRangeTracker(start, end)
+
+  def read(self, range_tracker):
+    for k in range(self._round_up(range_tracker.start_position()),
+                   self._round_up(range_tracker.stop_position())):
+      if k % self._split_freq == 0:
+        if not range_tracker.try_claim(k):
+          return
+      yield k
+
+
+class ConcatSourceTest(unittest.TestCase):
+
+  def test_range_source(self):
+    source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+
+  def test_conact_source(self):
+    source = ConcatSource([RangeSource(0, 4),
+                           RangeSource(4, 8),
+                           RangeSource(8, 12),
+                           RangeSource(12, 16),
+                          ])
+    self.assertEqual(list(source.read(source.get_range_tracker())),
+                     range(16))
+    self.assertEqual(list(source.read(source.get_range_tracker((1, None),
+                                                               (2, 10)))),
+                     range(4, 10))
+    range_tracker = source.get_range_tracker(None, None)
+    self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
+    self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
+    self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
+
+    # Simulate a read.
+    self.assertEqual(range_tracker.try_claim((0, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
+    self.assertEqual(range_tracker.fraction_consumed(), 0.125)
+
+    self.assertEqual(range_tracker.try_claim((1, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
+    self.assertEqual(range_tracker.fraction_consumed(), 0.375)
+    self.assertEqual(range_tracker.try_split((0, 1)), None)
+    self.assertEqual(range_tracker.try_split((1, 5)), None)
+
+    self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
+    self.assertEqual(range_tracker.try_claim((3, None)), False)
+    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
+    self.assertEqual(range_tracker.try_claim((2, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
+
+    self.assertEqual(range_tracker.try_split((2, 8)), None)
+    self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+
+  def test_estimate_size(self):
+    source = ConcatSource([RangeSource(0, 10),
+                           RangeSource(10, 100),
+                           RangeSource(100, 1000),
+                          ])
+    self.assertEqual(source.estimate_size(), 1000)
+
+  def test_position_at_fration(self):
+    ranges = [(0, 4), (4, 16), (16, 24), (24, 32)]
+    source = ConcatSource([iobase.SourceBundle((range[1] - range[0]) / 32.,
+                                               RangeSource(*range),
+                                               None, None)
+                           for range in ranges])
+
+    range_tracker = source.get_range_tracker()
+    self.assertEquals(range_tracker.position_at_fraction(0), (0, 0))
+    self.assertEquals(range_tracker.position_at_fraction(.01), (0, 1))
+    self.assertEquals(range_tracker.position_at_fraction(.1), (0, 4))
+    self.assertEquals(range_tracker.position_at_fraction(.125), (1, 4))
+    self.assertEquals(range_tracker.position_at_fraction(.2), (1, 7))
+    self.assertEquals(range_tracker.position_at_fraction(.7), (2, 23))
+    self.assertEquals(range_tracker.position_at_fraction(.75), (3, 24))
+    self.assertEquals(range_tracker.position_at_fraction(.8), (3, 26))
+    self.assertEquals(range_tracker.position_at_fraction(1), (4, None))
+
+    range_tracker = source.get_range_tracker((1, None), (3, None))
+    self.assertEquals(range_tracker.position_at_fraction(0), (1, 4))
+    self.assertEquals(range_tracker.position_at_fraction(.01), (1, 5))
+    self.assertEquals(range_tracker.position_at_fraction(.5), (1, 14))
+    self.assertEquals(range_tracker.position_at_fraction(.599), (1, 16))
+    self.assertEquals(range_tracker.position_at_fraction(.601), (2, 17))
+    self.assertEquals(range_tracker.position_at_fraction(1), (3, None))
+
+  def test_empty_source(self):
+    read_all = source_test_utils.readFromSource
+
+    empty = RangeSource(0, 0)
+    self.assertEquals(read_all(ConcatSource([])), [])
+    self.assertEquals(read_all(ConcatSource([empty])), [])
+    self.assertEquals(read_all(ConcatSource([empty, empty])), [])
+
+    range10 = RangeSource(0, 10)
+    self.assertEquals(read_all(ConcatSource([range10]), (0, None), (0, 0)),
+                      [])
+    self.assertEquals(read_all(ConcatSource([range10]), (0, 10), (1, None)),
+                      [])
+    self.assertEquals(read_all(ConcatSource([range10, range10]),
+                               (0, 10), (1, 0)),
+                      [])
+
+  def test_single_source(self):
+    read_all = source_test_utils.readFromSource
+
+    range10 = RangeSource(0, 10)
+    self.assertEquals(read_all(ConcatSource([range10])), range(10))
+    self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
+    self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
+                      range(5))
+
+  def test_source_with_empty_ranges(self):
+    read_all = source_test_utils.readFromSource
+
+    empty = RangeSource(0, 0)
+    self.assertEquals(read_all(empty), [])
+
+    range10 = RangeSource(0, 10)
+    self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
+                      range(10))
+    self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
+                      range(10))
+    self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
+                      range(10) + range(10))
+
+  def test_source_with_empty_ranges_exhastive(self):
+    empty = RangeSource(0, 0)
+    source = ConcatSource([empty,
+                           RangeSource(0, 10),
+                           empty,
+                           empty,
+                           RangeSource(10, 13),
+                           RangeSource(13, 17),
+                           empty,
+                          ])
+    source_test_utils.assertSplitAtFractionExhaustive(source)
+
+  def test_run_concat_direct(self):
+    source = ConcatSource([RangeSource(0, 10),
+                           RangeSource(10, 100),
+                           RangeSource(100, 1000),
+                          ])
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.Read(source)
+    assert_that(pcoll, equal_to(range(1000)))
+
+    pipeline.run()
+
+  def test_conact_source_exhaustive(self):
+    source = ConcatSource([RangeSource(0, 10),
+                           RangeSource(100, 110),
+                           RangeSource(1000, 1010),
+                          ])
+    source_test_utils.assertSplitAtFractionExhaustive(source)
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 2911b9f..8ff69ca 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -28,6 +28,7 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
+from apache_beam.io import concat_source
 from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
@@ -107,7 +108,7 @@ class FileBasedSource(iobase.BoundedSource):
             sizes[index],
             min_bundle_size=self._min_bundle_size)
         single_file_sources.append(single_file_source)
-      self._concat_source = iobase.ConcatSource(single_file_sources)
+      self._concat_source = concat_source.ConcatSource(single_file_sources)
     return self._concat_source
 
   def open_file(self, file_name):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 3efcc16..d45a6f9 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 
 # importing following private classes for testing
-from apache_beam.io.iobase import ConcatSource
+from apache_beam.io.concat_source import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index ac06468..4305fb6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,10 +31,8 @@ the sink.
 
 from collections import namedtuple
 
-import bisect
 import logging
 import random
-import threading
 import uuid
 
 from apache_beam import pvalue
@@ -1099,235 +1097,3 @@ class _RoundRobinKeyFn(core.DoFn):
     if self.counter >= self.count:
       self.counter -= self.count
     yield self.counter, context.element
-
-
-class ConcatSource(BoundedSource):
-  """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
-  class ConcatRangeTracker(RangeTracker):
-
-    def __init__(self, start, end, source_bundles):
-      """Initializes ``ConcatRangeTracker``
-
-      Args:
-        start: start position, a tuple of (source_index, source_position)
-        end: end position, a tuple of (source_index, source_position)
-        source_bundles: the list of source bundles in the ConcatSource
-      """
-      super(ConcatSource.ConcatRangeTracker, self).__init__()
-      self._start = start
-      self._end = end
-      self._source_bundles = source_bundles
-      self._lock = threading.RLock()
-      # Lazily-initialized list of RangeTrackers corresponding to each source.
-      self._range_trackers = [None] * len(source_bundles)
-      # The currently-being-iterated-over (and latest claimed) source.
-      self._claimed_source_ix = self._start[0]
-      # Now compute cumulative progress through the sources for converting
-      # between global fractions and fractions within specific sources.
-      # TODO(robertwb): Implement fraction-at-position to properly scale
-      # partial start and end sources.
-      # Note, however, that in practice splits are typically on source
-      # boundaries anyways.
-      last = end[0] if end[1] is None else end[0] + 1
-      self._cumulative_weights = (
-          [0] * start[0]
-          + self._compute_cumulative_weights(source_bundles[start[0]:last])
-          + [1] * (len(source_bundles) - last - start[0]))
-
-    @staticmethod
-    def _compute_cumulative_weights(source_bundles):
-      # Two adjacent sources must differ so that they can be uniquely
-      # identified by a single global fraction.  Let min_diff be the
-      # smallest allowable difference between sources.
-      min_diff = 1e-5
-      # For the computation below, we need weights for all sources.
-      # Substitute average weights for those whose weights are
-      # unspecified (or 1.0 for everything if none are known).
-      known = [s.weight for s in source_bundles if s.weight is not None]
-      avg = sum(known) / len(known) if known else 1.0
-      weights = [s.weight or avg for s in source_bundles]
-
-      # Now compute running totals of the percent done upon reaching
-      # each source, with respect to the start and end positions.
-      # E.g. if the weights were [100, 20, 3] we would produce
-      # [0.0, 100/123, 120/123, 1.0]
-      total = float(sum(weights))
-      running_total = [0]
-      for w in weights:
-        running_total.append(
-            max(min_diff, min(1, running_total[-1] + w / total)))
-      running_total[-1] = 1  # In case of rounding error.
-      # There are issues if, due to rouding error or greatly differing sizes,
-      # two adjacent running total weights are equal. Normalize this things so
-      # that this never happens.
-      for k in range(1, len(running_total)):
-        if running_total[k] == running_total[k - 1]:
-          for j in range(k):
-            running_total[j] *= (1 - min_diff)
-      return running_total
-
-    def start_position(self):
-      return self._start
-
-    def stop_position(self):
-      return self._end
-
-    def try_claim(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix > self._end[0]:
-          return False
-        elif source_ix == self._end[0] and self._end[1] is None:
-          return False
-        else:
-          assert source_ix >= self._claimed_source_ix
-          self._claimed_source_ix = source_ix
-          if source_pos is None:
-            return True
-          else:
-            return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
-    def try_split(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix < self._claimed_source_ix:
-          # Already claimed.
-          return None
-        elif source_ix > self._end[0]:
-          # After end.
-          return None
-        elif source_ix == self._end[0] and self._end[1] is None:
-          # At/after end.
-          return None
-        else:
-          if source_ix > self._claimed_source_ix:
-            # Prefer to split on even boundary.
-            split_pos = None
-            ratio = self._cumulative_weights[source_ix]
-          else:
-            # Split the current subsource.
-            split = self.sub_range_tracker(source_ix).try_split(
-                source_pos)
-            if not split:
-              return None
-            split_pos, frac = split
-            ratio = self.local_to_global(source_ix, frac)
-
-          self._end = source_ix, split_pos
-          self._cumulative_weights = [min(w / ratio, 1)
-                                      for w in self._cumulative_weights]
-          return (source_ix, split_pos), ratio
-
-    def set_current_position(self, pos):
-      raise NotImplementedError('Should only be called on sub-trackers')
-
-    def position_at_fraction(self, fraction):
-      source_ix, source_frac = self.global_to_local(fraction)
-      if source_ix == len(self._source_bundles):
-        return (source_ix, None)
-      else:
-        return (source_ix,
-                self.sub_range_tracker(source_ix).position_at_fraction(
-                    source_frac))
-
-    def fraction_consumed(self):
-      with self._lock:
-        return self.local_to_global(
-            self._claimed_source_ix,
-            self.sub_range_tracker(self._claimed_source_ix)
-            .fraction_consumed())
-
-    def local_to_global(self, source_ix, source_frac):
-      cw = self._cumulative_weights
-      # The global fraction is the fraction to source_ix plus some portion of
-      # the way towards the next source.
-      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
-    def global_to_local(self, frac):
-      if frac == 1:
-        return (len(self._source_bundles), 0)
-      else:
-        cw = self._cumulative_weights
-        # Find the last source that starts at or before frac.
-        source_ix = bisect.bisect(cw, frac) - 1
-        # Return this source, converting what's left of frac after starting
-        # this source into a value in [0.0, 1.0) representing how far we are
-        # towards the next source.
-        return (source_ix,
-                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
-    def sub_range_tracker(self, source_ix):
-      assert self._start[0] <= source_ix <= self._end[0]
-      if self._range_trackers[source_ix] is None:
-        with self._lock:
-          if self._range_trackers[source_ix] is None:
-            source = self._source_bundles[source_ix]
-            if source_ix == self._start[0] and self._start[1] is not None:
-              start = self._start[1]
-            else:
-              start = source.start_position
-            if source_ix == self._end[0] and self._end[1] is not None:
-              stop = self._end[1]
-            else:
-              stop = source.stop_position
-            self._range_trackers[source_ix] = source.source.get_range_tracker(
-                start, stop)
-      return self._range_trackers[source_ix]
-
-  def __init__(self, sources):
-    self._source_bundles = [source if isinstance(source, SourceBundle)
-                            else SourceBundle(None, source, None, None)
-                            for source in sources]
-
-  @property
-  def sources(self):
-    return [s.source for s in self._source_bundles]
-
-  def estimate_size(self):
-    return sum(s.source.estimate_size() for s in self._source_bundles)
-
-  def split(
-      self, desired_bundle_size=None, start_position=None, stop_position=None):
-    if start_position or stop_position:
-      raise ValueError(
-          'Multi-level initial splitting is not supported. Expected start and '
-          'stop positions to be None. Received %r and %r respectively.' %
-          (start_position, stop_position))
-
-    for source in self._source_bundles:
-      # We assume all sub-sources to produce bundles that specify weight using
-      # the same unit. For example, all sub-sources may specify the size in
-      # bytes as their weight.
-      for bundle in source.source.split(
-          desired_bundle_size, source.start_position, source.stop_position):
-        yield bundle
-
-  def get_range_tracker(self, start_position=None, stop_position=None):
-    if start_position is None:
-      start_position = (0, None)
-    if stop_position is None:
-      stop_position = (len(self._source_bundles), None)
-    return self.ConcatRangeTracker(
-        start_position, stop_position, self._source_bundles)
-
-  def read(self, range_tracker):
-    start_source, _ = range_tracker.start_position()
-    stop_source, stop_pos = range_tracker.stop_position()
-    if stop_pos is not None:
-      stop_source += 1
-    for source_ix in range(start_source, stop_source):
-      if not range_tracker.try_claim((source_ix, None)):
-        break
-      for record in self._source_bundles[source_ix].source.read(
-          range_tracker.sub_range_tracker(source_ix)):
-        yield record
-
-  def default_output_coder(self):
-    if self._source_bundles:
-      # Getting coder from the first sub-sources. This assumes all sub-sources
-      # to produce the same coder.
-      return self._source_bundles[0].source.default_output_coder()
-    else:
-      # Defaulting to PickleCoder.
-      return super(ConcatSource, self).default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index f52f16b..13b1e91 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -66,7 +66,7 @@ SplitFractionStatistics = namedtuple(
     'successful_fractions non_trivial_fractions')
 
 
-def readFromSource(source, start_position, stop_position):
+def readFromSource(source, start_position=None, stop_position=None):
   """Reads elements from the given ```BoundedSource```.
 
   Only reads elements within the given position range.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index e6f369e..35a13f4 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,7 +27,6 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
-from apache_beam.io import source_test_utils
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -81,47 +80,6 @@ class LineSource(iobase.BoundedSource):
     return coders.BytesCoder()
 
 
-class RangeSource(iobase.BoundedSource):
-
-  def __init__(self, start, end, split_freq=1):
-    assert start <= end
-    self._start = start
-    self._end = end
-    self._split_freq = split_freq
-
-  def _normalize(self, start_position, end_position):
-    return (self._start if start_position is None else start_position,
-            self._end if end_position is None else end_position)
-
-  def _round_up(self, index):
-    """Rounds up to the nearest mulitple of split_freq."""
-    return index - index % -self._split_freq
-
-  def estimate_size(self):
-    return self._stop - self._start
-
-  def split(self, desired_bundle_size, start_position=None, end_position=None):
-    start, end = self._normalize(start_position, end_position)
-    for sub_start in range(start, end, desired_bundle_size):
-      sub_end = min(self._end, sub_start + desired_bundle_size)
-      yield iobase.SourceBundle(
-          sub_end - sub_start,
-          RangeSource(sub_start, sub_end, self._split_freq),
-          None, None)
-
-  def get_range_tracker(self, start_position, end_position):
-    start, end = self._normalize(start_position, end_position)
-    return range_trackers.OffsetRangeTracker(start, end)
-
-  def read(self, range_tracker):
-    for k in range(self._round_up(range_tracker.start_position()),
-                   self._round_up(range_tracker.stop_position())):
-      if k % self._split_freq == 0:
-        if not range_tracker.try_claim(k):
-          return
-      yield k
-
-
 class SourcesTest(unittest.TestCase):
 
   def _create_temp_file(self, contents):
@@ -146,64 +104,6 @@ class SourcesTest(unittest.TestCase):
 
     pipeline.run()
 
-  def test_range_source(self):
-    source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
-
-  def test_conact_source(self):
-    source = iobase.ConcatSource([RangeSource(0, 4),
-                                  RangeSource(4, 8),
-                                  RangeSource(8, 12),
-                                  RangeSource(12, 16),
-                                 ])
-    self.assertEqual(list(source.read(source.get_range_tracker())),
-                     range(16))
-    self.assertEqual(list(source.read(source.get_range_tracker((1, None),
-                                                               (2, 10)))),
-                     range(4, 10))
-    range_tracker = source.get_range_tracker(None, None)
-    self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
-    self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
-    self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
-
-    # Simulate a read.
-    self.assertEqual(range_tracker.try_claim((0, None)), True)
-    self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
-    self.assertEqual(range_tracker.fraction_consumed(), 0.125)
-
-    self.assertEqual(range_tracker.try_claim((1, None)), True)
-    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
-    self.assertEqual(range_tracker.fraction_consumed(), 0.375)
-    self.assertEqual(range_tracker.try_split((0, 1)), None)
-    self.assertEqual(range_tracker.try_split((1, 5)), None)
-
-    self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
-    self.assertEqual(range_tracker.try_claim((3, None)), False)
-    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
-    self.assertEqual(range_tracker.try_claim((2, None)), True)
-    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
-
-    self.assertEqual(range_tracker.try_split((2, 8)), None)
-    self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
-    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
-    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
-
-  def test_run_concat_direct(self):
-    source = iobase.ConcatSource([RangeSource(0, 10),
-                                  RangeSource(10, 100),
-                                  RangeSource(100, 1000),
-                                 ])
-    pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Read(source)
-    assert_that(pcoll, equal_to(range(1000)))
-
-    pipeline.run()
-
-  def test_conact_source_exhaustive(self):
-    source = iobase.ConcatSource([RangeSource(0, 10),
-                                  RangeSource(100, 110),
-                                  RangeSource(1000, 1010),
-                                 ])
-    source_test_utils.assertSplitAtFractionExhaustive(source)
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)



[2/6] incubator-beam git commit: Allow ConcatSource to take SourceBundles rather than raw Sources

Posted by ro...@apache.org.
Allow ConcatSource to take SourceBundles rather than raw Sources


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ffe7ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ffe7ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ffe7ccf

Branch: refs/heads/python-sdk
Commit: 0ffe7ccffd46c2e0505d723cca4305bbc7fb8940
Parents: d65d17a
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 17:21:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py       | 136 ++++++++++++++++--------
 sdks/python/apache_beam/io/sources_test.py |  21 +++-
 2 files changed, 110 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ffe7ccf/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 1205a26..ac06468 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1106,34 +1106,66 @@ class ConcatSource(BoundedSource):
 
   class ConcatRangeTracker(RangeTracker):
 
-    def __init__(self, start, end, sources, weights=None):
+    def __init__(self, start, end, source_bundles):
+      """Initializes ``ConcatRangeTracker``
+
+      Args:
+        start: start position, a tuple of (source_index, source_position)
+        end: end position, a tuple of (source_index, source_position)
+        source_bundles: the list of source bundles in the ConcatSource
+      """
       super(ConcatSource.ConcatRangeTracker, self).__init__()
       self._start = start
       self._end = end
+      self._source_bundles = source_bundles
       self._lock = threading.RLock()
-      self._sources = sources
-      self._range_trackers = [None] * len(self._sources)
+      # Lazily-initialized list of RangeTrackers corresponding to each source.
+      self._range_trackers = [None] * len(source_bundles)
+      # The currently-being-iterated-over (and latest claimed) source.
       self._claimed_source_ix = self._start[0]
-
-      if weights is None:
-        n = max(1, end[0] - start[0])
-        self._cumulative_weights = [
-          max(0, min(1, float(k) / n))
-          for k in range(-start[0], len(self._sources) - start[0] + 1)]
-      else:
-        assert len(sources) == len(weights)
-        relevant_weights = weights[start[0]:end[0] + 1]
-        # TODO(robertwb): Implement fraction-at-position to properly scale
-        # partial start and end sources.
-        total = sum(relevant_weights)
-        running_total = [0]
-        for w in relevant_weights:
-          running_total.append(max(1, running_total[-1] + w / total))
-        running_total[-1] = 1  # In case of rounding error.
-        self._cumulative_weights = (
+      # Now compute cumulative progress through the sources for converting
+      # between global fractions and fractions within specific sources.
+      # TODO(robertwb): Implement fraction-at-position to properly scale
+      # partial start and end sources.
+      # Note, however, that in practice splits are typically on source
+      # boundaries anyways.
+      last = end[0] if end[1] is None else end[0] + 1
+      self._cumulative_weights = (
           [0] * start[0]
-          + running_total
-          + [1] * (len(self._sources) - end[0]))
+          + self._compute_cumulative_weights(source_bundles[start[0]:last])
+          + [1] * (len(source_bundles) - last - start[0]))
+
+    @staticmethod
+    def _compute_cumulative_weights(source_bundles):
+      # Two adjacent sources must differ so that they can be uniquely
+      # identified by a single global fraction.  Let min_diff be the
+      # smallest allowable difference between sources.
+      min_diff = 1e-5
+      # For the computation below, we need weights for all sources.
+      # Substitute average weights for those whose weights are
+      # unspecified (or 1.0 for everything if none are known).
+      known = [s.weight for s in source_bundles if s.weight is not None]
+      avg = sum(known) / len(known) if known else 1.0
+      weights = [s.weight or avg for s in source_bundles]
+
+      # Now compute running totals of the percent done upon reaching
+      # each source, with respect to the start and end positions.
+      # E.g. if the weights were [100, 20, 3] we would produce
+      # [0.0, 100/123, 120/123, 1.0]
+      total = float(sum(weights))
+      running_total = [0]
+      for w in weights:
+        running_total.append(
+            max(min_diff, min(1, running_total[-1] + w / total)))
+      running_total[-1] = 1  # In case of rounding error.
+      # There are issues if, due to rouding error or greatly differing sizes,
+      # two adjacent running total weights are equal. Normalize this things so
+      # that this never happens.
+      for k in range(1, len(running_total)):
+        if running_total[k] == running_total[k - 1]:
+          for j in range(k):
+            running_total[j] *= (1 - min_diff)
+      return running_total
 
     def start_position(self):
       return self._start
@@ -1149,6 +1181,7 @@ class ConcatSource(BoundedSource):
         elif source_ix == self._end[0] and self._end[1] is None:
           return False
         else:
+          assert source_ix >= self._claimed_source_ix
           self._claimed_source_ix = source_ix
           if source_pos is None:
             return True
@@ -1183,7 +1216,7 @@ class ConcatSource(BoundedSource):
 
           self._end = source_ix, split_pos
           self._cumulative_weights = [min(w / ratio, 1)
-                                        for w in self._cumulative_weights]
+                                      for w in self._cumulative_weights]
           return (source_ix, split_pos), ratio
 
     def set_current_position(self, pos):
@@ -1191,7 +1224,7 @@ class ConcatSource(BoundedSource):
 
     def position_at_fraction(self, fraction):
       source_ix, source_frac = self.global_to_local(fraction)
-      if source_ix == len(self._sources):
+      if source_ix == len(self._source_bundles):
         return (source_ix, None)
       else:
         return (source_ix,
@@ -1200,21 +1233,27 @@ class ConcatSource(BoundedSource):
 
     def fraction_consumed(self):
       with self._lock:
-        return self.local_to_global(self._claimed_source_ix,
-                                    self.sub_range_tracker(
-                                        self._claimed_source_ix)
-                                        .fraction_consumed())
+        return self.local_to_global(
+            self._claimed_source_ix,
+            self.sub_range_tracker(self._claimed_source_ix)
+            .fraction_consumed())
 
     def local_to_global(self, source_ix, source_frac):
       cw = self._cumulative_weights
+      # The global fraction is the fraction to source_ix plus some portion of
+      # the way towards the next source.
       return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
 
     def global_to_local(self, frac):
       if frac == 1:
-        return (len(self._sources), 0)
+        return (len(self._source_bundles), 0)
       else:
         cw = self._cumulative_weights
+        # Find the last source that starts at or before frac.
         source_ix = bisect.bisect(cw, frac) - 1
+        # Return this source, converting what's left of frac after starting
+        # this source into a value in [0.0, 1.0) representing how far we are
+        # towards the next source.
         return (source_ix,
                 (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
 
@@ -1223,21 +1262,30 @@ class ConcatSource(BoundedSource):
       if self._range_trackers[source_ix] is None:
         with self._lock:
           if self._range_trackers[source_ix] is None:
-            self._range_trackers[source_ix] = (
-                self._sources[source_ix].get_range_tracker(
-                    self._start[1] if source_ix == self._start[0] else None,
-                    self._end[1] if source_ix == self._end[0] else None))
+            source = self._source_bundles[source_ix]
+            if source_ix == self._start[0] and self._start[1] is not None:
+              start = self._start[1]
+            else:
+              start = source.start_position
+            if source_ix == self._end[0] and self._end[1] is not None:
+              stop = self._end[1]
+            else:
+              stop = source.stop_position
+            self._range_trackers[source_ix] = source.source.get_range_tracker(
+                start, stop)
       return self._range_trackers[source_ix]
 
   def __init__(self, sources):
-    self._sources = sources
+    self._source_bundles = [source if isinstance(source, SourceBundle)
+                            else SourceBundle(None, source, None, None)
+                            for source in sources]
 
   @property
   def sources(self):
-    return self._sources
+    return [s.source for s in self._source_bundles]
 
   def estimate_size(self):
-    return sum(s.estimate_size() for s in self._sources)
+    return sum(s.source.estimate_size() for s in self._source_bundles)
 
   def split(
       self, desired_bundle_size=None, start_position=None, stop_position=None):
@@ -1247,19 +1295,21 @@ class ConcatSource(BoundedSource):
           'stop positions to be None. Received %r and %r respectively.' %
           (start_position, stop_position))
 
-    for source in self._sources:
+    for source in self._source_bundles:
       # We assume all sub-sources to produce bundles that specify weight using
       # the same unit. For example, all sub-sources may specify the size in
       # bytes as their weight.
-      for bundle in source.split(desired_bundle_size, None, None):
+      for bundle in source.source.split(
+          desired_bundle_size, source.start_position, source.stop_position):
         yield bundle
 
   def get_range_tracker(self, start_position=None, stop_position=None):
     if start_position is None:
       start_position = (0, None)
     if stop_position is None:
-      stop_position = (len(self._sources), None)
-    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+      stop_position = (len(self._source_bundles), None)
+    return self.ConcatRangeTracker(
+        start_position, stop_position, self._source_bundles)
 
   def read(self, range_tracker):
     start_source, _ = range_tracker.start_position()
@@ -1269,15 +1319,15 @@ class ConcatSource(BoundedSource):
     for source_ix in range(start_source, stop_source):
       if not range_tracker.try_claim((source_ix, None)):
         break
-      for record in self._sources[source_ix].read(
+      for record in self._source_bundles[source_ix].source.read(
           range_tracker.sub_range_tracker(source_ix)):
         yield record
 
   def default_output_coder(self):
-    if self._sources:
+    if self._source_bundles:
       # Getting coder from the first sub-sources. This assumes all sub-sources
       # to produce the same coder.
-      return self._sources[0].default_output_coder()
+      return self._source_bundles[0].source.default_output_coder()
     else:
       # Defaulting to PickleCoder.
       return super(ConcatSource, self).default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ffe7ccf/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 4d16560..e6f369e 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -104,8 +104,10 @@ class RangeSource(iobase.BoundedSource):
     start, end = self._normalize(start_position, end_position)
     for sub_start in range(start, end, desired_bundle_size):
       sub_end = min(self._end, sub_start + desired_bundle_size)
-      yield SourceBundle(RangeSource(sub_start, sub_end, self._split_freq),
-                         sub_end - sub_start)
+      yield iobase.SourceBundle(
+          sub_end - sub_start,
+          RangeSource(sub_start, sub_end, self._split_freq),
+          None, None)
 
   def get_range_tracker(self, start_position, end_position):
     start, end = self._normalize(start_position, end_position)
@@ -152,7 +154,7 @@ class SourcesTest(unittest.TestCase):
                                   RangeSource(4, 8),
                                   RangeSource(8, 12),
                                   RangeSource(12, 16),
-                                ])
+                                 ])
     self.assertEqual(list(source.read(source.get_range_tracker())),
                      range(16))
     self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -185,11 +187,22 @@ class SourcesTest(unittest.TestCase):
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
 
+  def test_run_concat_direct(self):
+    source = iobase.ConcatSource([RangeSource(0, 10),
+                                  RangeSource(10, 100),
+                                  RangeSource(100, 1000),
+                                 ])
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.Read(source)
+    assert_that(pcoll, equal_to(range(1000)))
+
+    pipeline.run()
+
   def test_conact_source_exhaustive(self):
     source = iobase.ConcatSource([RangeSource(0, 10),
                                   RangeSource(100, 110),
                                   RangeSource(1000, 1010),
-                                ])
+                                 ])
     source_test_utils.assertSplitAtFractionExhaustive(source)
 
 if __name__ == '__main__':


[6/6] incubator-beam git commit: Closes #955

Posted by ro...@apache.org.
Closes #955


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/753cc9c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/753cc9c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/753cc9c2

Branch: refs/heads/python-sdk
Commit: 753cc9c2e432beb4175902b28b3f9f788bc87f14
Parents: 0fa9c4b d189f83
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Sep 23 13:50:02 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:50:02 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 264 +++++++++++++++++++
 .../python/apache_beam/io/concat_source_test.py | 222 ++++++++++++++++
 sdks/python/apache_beam/io/filebasedsource.py   |  81 +-----
 .../apache_beam/io/filebasedsource_test.py      |  12 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/io/range_trackers.py    |   2 +-
 sdks/python/apache_beam/io/source_test_utils.py |  27 +-
 sdks/python/apache_beam/io/sources_test.py      |   1 +
 8 files changed, 521 insertions(+), 92 deletions(-)
----------------------------------------------------------------------



[3/6] incubator-beam git commit: Minor cleanups in docstrings and error messages.

Posted by ro...@apache.org.
Minor cleanups in docstrings and error messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1f42e62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1f42e62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1f42e62

Branch: refs/heads/python-sdk
Commit: c1f42e6203de8d0576dbbf324c22edf6aac0ca64
Parents: 0fa9c4b
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:25:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   | 25 +++++++++-----------
 .../apache_beam/io/filebasedsource_test.py      | 10 ++++----
 sdks/python/apache_beam/io/iobase.py            |  4 ++--
 sdks/python/apache_beam/io/range_trackers.py    |  2 +-
 sdks/python/apache_beam/io/source_test_utils.py | 25 ++++++++++----------
 5 files changed, 31 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 14de140..5204f04 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -54,8 +54,8 @@ class _ConcatSource(iobase.BoundedSource):
     if start_position or stop_position:
       raise ValueError(
           'Multi-level initial splitting is not supported. Expected start and '
-          'stop positions to be None. Received %r and %r respectively.',
-          start_position, stop_position)
+          'stop positions to be None. Received %r and %r respectively.' %
+          (start_position, stop_position))
 
     for source in self._sources:
       # We assume all sub-sources to produce bundles that specify weight using
@@ -224,25 +224,22 @@ class FileBasedSource(iobase.BoundedSource):
 
 
 class _SingleFileSource(iobase.BoundedSource):
-  """Denotes a source for a specific file type.
-
-  This should be sub-classed to add support for reading a new file type.
-  """
+  """Denotes a source for a specific file type."""
 
   def __init__(self, file_based_source, file_name, start_offset, stop_offset,
                min_bundle_size=0):
-    if not (isinstance(start_offset, int) or isinstance(start_offset, long)):
-      raise ValueError(
-          'start_offset must be a number. Received: %r', start_offset)
+    if not isinstance(start_offset, (int, long)):
+      raise TypeError(
+          'start_offset must be a number. Received: %r' % start_offset)
     if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
-      if not (isinstance(stop_offset, int) or isinstance(stop_offset, long)):
-        raise ValueError(
-            'stop_offset must be a number. Received: %r', stop_offset)
+      if not isinstance(stop_offset, (int, long)):
+        raise TypeError(
+            'stop_offset must be a number. Received: %r' % stop_offset)
       if start_offset >= stop_offset:
         raise ValueError(
             'start_offset must be smaller than stop_offset. Received %d and %d '
-            'for start and stop offsets respectively',
-            start_offset, stop_offset)
+            'for start and stop offsets respectively' %
+            (start_offset, stop_offset))
 
     self._file_name = file_name
     self._is_gcs_file = file_name.startswith('gs://') if file_name else False

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c4ad026..50f0a22 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -384,19 +384,19 @@ class TestSingleFileSource(unittest.TestCase):
 
     fbs = LineSource('dymmy_pattern')
 
-    with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+    with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset='aaa', stop_offset='bbb')
-    with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+    with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset='aaa', stop_offset=100)
-    with self.assertRaisesRegexp(ValueError, stop_not_a_number_error):
+    with self.assertRaisesRegexp(TypeError, stop_not_a_number_error):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset=100, stop_offset='bbb')
-    with self.assertRaisesRegexp(ValueError, stop_not_a_number_error):
+    with self.assertRaisesRegexp(TypeError, stop_not_a_number_error):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset=100, stop_offset=None)
-    with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+    with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
       SingleFileSource(
           fbs, file_name='dummy_file', start_offset=None, stop_offset=100)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e1f364b..4305fb6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -545,11 +545,11 @@ class RangeTracker(object):
 
   def start_position(self):
     """Returns the starting position of the current range, inclusive."""
-    raise NotImplementedError
+    raise NotImplementedError(type(self))
 
   def stop_position(self):
     """Returns the ending position of the current range, exclusive."""
-    raise NotImplementedError
+    raise NotImplementedError(type(self))
 
   def try_claim(self, position):  # pylint: disable=unused-argument
     """Atomically determines if a record at a split point is within the range.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index af6f6c8..080e2f3 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -138,10 +138,10 @@ class OffsetRangeTracker(iobase.RangeTracker):
         return
 
       logging.debug('Agreeing to split %r at %d', self, split_offset)
-      self._stop_offset = split_offset
 
       split_fraction = (float(split_offset - self._start_offset) / (
           self._stop_offset - self._start_offset))
+      self._stop_offset = split_offset
 
       return self._stop_offset, split_fraction
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 4e3a3e3..f52f16b 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -201,43 +201,42 @@ def _assertSplitAtFractionBehavior(
   if split_result is not None:
     if len(split_result) != 2:
       raise ValueError('Split result must be a tuple that contains split '
-                       'position and split fraction. Received: %r',
-                       split_result)
+                       'position and split fraction. Received: %r' %
+                       (split_result,))
 
     if range_tracker.stop_position() != split_result[0]:
       raise ValueError('After a successful split, the stop position of the '
                        'RangeTracker must be the same as the returned split '
                        'position. Observed %r and %r which are different.',
-                       range_tracker.stop_position(), split_result[0])
+                       range_tracker.stop_position() % (split_result[0],))
 
     if split_fraction < 0 or split_fraction > 1:
       raise ValueError('Split fraction must be within the range [0,1]',
-                       'Observed split fraction was %r.', split_result[1])
+                       'Observed split fraction was %r.' % (split_result[1],))
 
   stop_position_after_split = range_tracker.stop_position()
   if split_result and stop_position_after_split == stop_position_before_split:
     raise ValueError('Stop position %r did not change after a successful '
-                     'split of source %r at fraction %r.',
-                     stop_position_before_split, source, split_fraction)
+                     'split of source %r at fraction %r.' %
+                     (stop_position_before_split, source, split_fraction))
 
   if expected_outcome == ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT:
     if not split_result:
       raise ValueError('Expected split of source %r at fraction %r to be '
                        'successful after reading %d elements. But '
-                       'the split failed.',
-                       source, split_fraction,
-                       num_items_to_read_before_split)
+                       'the split failed.' %
+                       (source, split_fraction, num_items_to_read_before_split))
   elif expected_outcome == ExpectedSplitOutcome.MUST_FAIL:
     if split_result:
       raise ValueError('Expected split of source %r at fraction %r after '
                        'reading %d elements to fail. But splitting '
-                       'succeeded with result %r.',
-                       source, split_fraction,
-                       num_items_to_read_before_split, split_result)
+                       'succeeded with result %r.' %
+                       (source, split_fraction, num_items_to_read_before_split,
+                        split_result))
 
   elif (expected_outcome !=
         ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS):
-    raise ValueError('Unknown type of expected outcome: %r',
+    raise ValueError('Unknown type of expected outcome: %r'%
                      expected_outcome)
   current_items.extend([value for value in reader_iter])
 


[5/6] incubator-beam git commit: Move ConcatSource to iobase.

Posted by ro...@apache.org.
Move ConcatSource to iobase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d65d17aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d65d17aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d65d17aa

Branch: refs/heads/python-sdk
Commit: d65d17aa1e5ecbeed20cbcf0edbe11280acf0262
Parents: c7d5a37
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:49:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   | 189 +------------------
 .../apache_beam/io/filebasedsource_test.py      |   2 +-
 sdks/python/apache_beam/io/iobase.py            | 184 ++++++++++++++++++
 sdks/python/apache_beam/io/sources_test.py      |  19 +-
 4 files changed, 195 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index dd91ef2..2911b9f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,9 +25,7 @@ for more details.
 For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
-import bisect
 from multiprocessing.pool import ThreadPool
-import threading
 
 from apache_beam.internal import pickler
 from apache_beam.io import fileio
@@ -37,191 +35,6 @@ from apache_beam.io import range_trackers
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 
-class ConcatSource(iobase.BoundedSource):
-  """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
-  class ConcatRangeTracker(iobase.RangeTracker):
-
-    def __init__(self, start, end, sources, weights=None):
-      super(ConcatSource.ConcatRangeTracker, self).__init__()
-      self._start = start
-      self._end = end
-      self._lock = threading.RLock()
-      self._sources = sources
-      self._range_trackers = [None] * len(self._sources)
-      self._claimed_source_ix = self._start[0]
-
-      if weights is None:
-        n = max(1, end[0] - start[0])
-        self._cumulative_weights = [
-          max(0, min(1, float(k) / n))
-          for k in range(-start[0], len(self._sources) - start[0] + 1)]
-      else:
-        assert len(sources) == len(weights)
-        relevant_weights = weights[start[0]:end[0] + 1]
-        # TODO(robertwb): Implement fraction-at-position to properly scale
-        # partial start and end sources.
-        total = sum(relevant_weights)
-        running_total = [0]
-        for w in relevant_weights:
-          running_total.append(max(1, running_total[-1] + w / total))
-        running_total[-1] = 1  # In case of rounding error.
-        self._cumulative_weights = (
-          [0] * start[0]
-          + running_total
-          + [1] * (len(self._sources) - end[0]))
-
-    def start_position(self):
-      return self._start
-
-    def stop_position(self):
-      return self._end
-
-    def try_claim(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix > self._end[0]:
-          return False
-        elif source_ix == self._end[0] and self._end[1] is None:
-          return False
-        else:
-          self._claimed_source_ix = source_ix
-          if source_pos is None:
-            return True
-          else:
-            return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
-    def try_split(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix < self._claimed_source_ix:
-          # Already claimed.
-          return None
-        elif source_ix > self._end[0]:
-          # After end.
-          return None
-        elif source_ix == self._end[0] and self._end[1] is None:
-          # At/after end.
-          return None
-        else:
-          if source_ix > self._claimed_source_ix:
-            # Prefer to split on even boundary.
-            split_pos = None
-            ratio = self._cumulative_weights[source_ix]
-          else:
-            # Split the current subsource.
-            split = self.sub_range_tracker(source_ix).try_split(
-                source_pos)
-            if not split:
-              return None
-            split_pos, frac = split
-            ratio = self.local_to_global(source_ix, frac)
-
-          self._end = source_ix, split_pos
-          self._cumulative_weights = [min(w / ratio, 1)
-                                        for w in self._cumulative_weights]
-          return (source_ix, split_pos), ratio
-
-    def set_current_position(self, pos):
-      raise NotImplementedError('Should only be called on sub-trackers')
-
-    def position_at_fraction(self, fraction):
-      source_ix, source_frac = self.global_to_local(fraction)
-      if source_ix == len(self._sources):
-        return (source_ix, None)
-      else:
-        return (source_ix,
-                self.sub_range_tracker(source_ix).position_at_fraction(
-                    source_frac))
-
-    def fraction_consumed(self):
-      with self._lock:
-        return self.local_to_global(self._claimed_source_ix,
-                                    self.sub_range_tracker(
-                                        self._claimed_source_ix)
-                                        .fraction_consumed())
-
-    def local_to_global(self, source_ix, source_frac):
-      cw = self._cumulative_weights
-      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
-    def global_to_local(self, frac):
-      if frac == 1:
-        return (len(self._sources), 0)
-      else:
-        cw = self._cumulative_weights
-        source_ix = bisect.bisect(cw, frac) - 1
-        return (source_ix,
-                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
-    def sub_range_tracker(self, source_ix):
-      assert self._start[0] <= source_ix <= self._end[0]
-      if self._range_trackers[source_ix] is None:
-        with self._lock:
-          if self._range_trackers[source_ix] is None:
-            self._range_trackers[source_ix] = (
-                self._sources[source_ix].get_range_tracker(
-                    self._start[1] if source_ix == self._start[0] else None,
-                    self._end[1] if source_ix == self._end[0] else None))
-      return self._range_trackers[source_ix]
-
-  def __init__(self, sources):
-    self._sources = sources
-
-  @property
-  def sources(self):
-    return self._sources
-
-  def estimate_size(self):
-    return sum(s.estimate_size() for s in self._sources)
-
-  def split(
-      self, desired_bundle_size=None, start_position=None, stop_position=None):
-    if start_position or stop_position:
-      raise ValueError(
-          'Multi-level initial splitting is not supported. Expected start and '
-          'stop positions to be None. Received %r and %r respectively.' %
-          (start_position, stop_position))
-
-    for source in self._sources:
-      # We assume all sub-sources to produce bundles that specify weight using
-      # the same unit. For example, all sub-sources may specify the size in
-      # bytes as their weight.
-      for bundle in source.split(desired_bundle_size, None, None):
-        yield bundle
-
-  def get_range_tracker(self, start_position=None, stop_position=None):
-    if start_position is None:
-      start_position = (0, None)
-    if stop_position is None:
-      stop_position = (len(self._sources), None)
-    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
-
-  def read(self, range_tracker):
-    start_source, _ = range_tracker.start_position()
-    stop_source, stop_pos = range_tracker.stop_position()
-    if stop_pos is not None:
-      stop_source += 1
-    for source_ix in range(start_source, stop_source):
-      if not range_tracker.try_claim((source_ix, None)):
-        break
-      for record in self._sources[source_ix].read(
-          range_tracker.sub_range_tracker(source_ix)):
-        yield record
-
-  def default_output_coder(self):
-    if self._sources:
-      # Getting coder from the first sub-sources. This assumes all sub-sources
-      # to produce the same coder.
-      return self._sources[0].default_output_coder()
-    else:
-      # Defaulting to PickleCoder.
-      return super(ConcatSource, self).default_output_coder()
-
-
-_ConcatSource = ConcatSource
-
-
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""
 
@@ -294,7 +107,7 @@ class FileBasedSource(iobase.BoundedSource):
             sizes[index],
             min_bundle_size=self._min_bundle_size)
         single_file_sources.append(single_file_source)
-      self._concat_source = ConcatSource(single_file_sources)
+      self._concat_source = iobase.ConcatSource(single_file_sources)
     return self._concat_source
 
   def open_file(self, file_name):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 50f0a22..3efcc16 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 
 # importing following private classes for testing
-from apache_beam.io.filebasedsource import _ConcatSource as ConcatSource
+from apache_beam.io.iobase import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..1205a26 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,8 +31,10 @@ the sink.
 
 from collections import namedtuple
 
+import bisect
 import logging
 import random
+import threading
 import uuid
 
 from apache_beam import pvalue
@@ -1097,3 +1099,185 @@ class _RoundRobinKeyFn(core.DoFn):
     if self.counter >= self.count:
       self.counter -= self.count
     yield self.counter, context.element
+
+
+class ConcatSource(BoundedSource):
+  """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
+
+  class ConcatRangeTracker(RangeTracker):
+
+    def __init__(self, start, end, sources, weights=None):
+      super(ConcatSource.ConcatRangeTracker, self).__init__()
+      self._start = start
+      self._end = end
+      self._lock = threading.RLock()
+      self._sources = sources
+      self._range_trackers = [None] * len(self._sources)
+      self._claimed_source_ix = self._start[0]
+
+      if weights is None:
+        n = max(1, end[0] - start[0])
+        self._cumulative_weights = [
+          max(0, min(1, float(k) / n))
+          for k in range(-start[0], len(self._sources) - start[0] + 1)]
+      else:
+        assert len(sources) == len(weights)
+        relevant_weights = weights[start[0]:end[0] + 1]
+        # TODO(robertwb): Implement fraction-at-position to properly scale
+        # partial start and end sources.
+        total = sum(relevant_weights)
+        running_total = [0]
+        for w in relevant_weights:
+          running_total.append(max(1, running_total[-1] + w / total))
+        running_total[-1] = 1  # In case of rounding error.
+        self._cumulative_weights = (
+          [0] * start[0]
+          + running_total
+          + [1] * (len(self._sources) - end[0]))
+
+    def start_position(self):
+      return self._start
+
+    def stop_position(self):
+      return self._end
+
+    def try_claim(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix > self._end[0]:
+          return False
+        elif source_ix == self._end[0] and self._end[1] is None:
+          return False
+        else:
+          self._claimed_source_ix = source_ix
+          if source_pos is None:
+            return True
+          else:
+            return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+    def try_split(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix < self._claimed_source_ix:
+          # Already claimed.
+          return None
+        elif source_ix > self._end[0]:
+          # After end.
+          return None
+        elif source_ix == self._end[0] and self._end[1] is None:
+          # At/after end.
+          return None
+        else:
+          if source_ix > self._claimed_source_ix:
+            # Prefer to split on even boundary.
+            split_pos = None
+            ratio = self._cumulative_weights[source_ix]
+          else:
+            # Split the current subsource.
+            split = self.sub_range_tracker(source_ix).try_split(
+                source_pos)
+            if not split:
+              return None
+            split_pos, frac = split
+            ratio = self.local_to_global(source_ix, frac)
+
+          self._end = source_ix, split_pos
+          self._cumulative_weights = [min(w / ratio, 1)
+                                        for w in self._cumulative_weights]
+          return (source_ix, split_pos), ratio
+
+    def set_current_position(self, pos):
+      raise NotImplementedError('Should only be called on sub-trackers')
+
+    def position_at_fraction(self, fraction):
+      source_ix, source_frac = self.global_to_local(fraction)
+      if source_ix == len(self._sources):
+        return (source_ix, None)
+      else:
+        return (source_ix,
+                self.sub_range_tracker(source_ix).position_at_fraction(
+                    source_frac))
+
+    def fraction_consumed(self):
+      with self._lock:
+        return self.local_to_global(self._claimed_source_ix,
+                                    self.sub_range_tracker(
+                                        self._claimed_source_ix)
+                                        .fraction_consumed())
+
+    def local_to_global(self, source_ix, source_frac):
+      cw = self._cumulative_weights
+      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+    def global_to_local(self, frac):
+      if frac == 1:
+        return (len(self._sources), 0)
+      else:
+        cw = self._cumulative_weights
+        source_ix = bisect.bisect(cw, frac) - 1
+        return (source_ix,
+                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+    def sub_range_tracker(self, source_ix):
+      assert self._start[0] <= source_ix <= self._end[0]
+      if self._range_trackers[source_ix] is None:
+        with self._lock:
+          if self._range_trackers[source_ix] is None:
+            self._range_trackers[source_ix] = (
+                self._sources[source_ix].get_range_tracker(
+                    self._start[1] if source_ix == self._start[0] else None,
+                    self._end[1] if source_ix == self._end[0] else None))
+      return self._range_trackers[source_ix]
+
+  def __init__(self, sources):
+    self._sources = sources
+
+  @property
+  def sources(self):
+    return self._sources
+
+  def estimate_size(self):
+    return sum(s.estimate_size() for s in self._sources)
+
+  def split(
+      self, desired_bundle_size=None, start_position=None, stop_position=None):
+    if start_position or stop_position:
+      raise ValueError(
+          'Multi-level initial splitting is not supported. Expected start and '
+          'stop positions to be None. Received %r and %r respectively.' %
+          (start_position, stop_position))
+
+    for source in self._sources:
+      # We assume all sub-sources to produce bundles that specify weight using
+      # the same unit. For example, all sub-sources may specify the size in
+      # bytes as their weight.
+      for bundle in source.split(desired_bundle_size, None, None):
+        yield bundle
+
+  def get_range_tracker(self, start_position=None, stop_position=None):
+    if start_position is None:
+      start_position = (0, None)
+    if stop_position is None:
+      stop_position = (len(self._sources), None)
+    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+
+  def read(self, range_tracker):
+    start_source, _ = range_tracker.start_position()
+    stop_source, stop_pos = range_tracker.stop_position()
+    if stop_pos is not None:
+      stop_source += 1
+    for source_ix in range(start_source, stop_source):
+      if not range_tracker.try_claim((source_ix, None)):
+        break
+      for record in self._sources[source_ix].read(
+          range_tracker.sub_range_tracker(source_ix)):
+        yield record
+
+  def default_output_coder(self):
+    if self._sources:
+      # Getting coder from the first sub-sources. This assumes all sub-sources
+      # to produce the same coder.
+      return self._sources[0].default_output_coder()
+    else:
+      # Defaulting to PickleCoder.
+      return super(ConcatSource, self).default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f83d80..4d16560 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,6 @@ import apache_beam as beam
 
 from apache_beam import coders
 from apache_beam.io import iobase
-from apache_beam.io.filebasedsource import ConcatSource
 from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.transforms.util import assert_that
@@ -149,11 +148,11 @@ class SourcesTest(unittest.TestCase):
     source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
 
   def test_conact_source(self):
-    source = ConcatSource([RangeSource(0, 4),
-                           RangeSource(4, 8),
-                           RangeSource(8, 12),
-                           RangeSource(12, 16),
-                          ])
+    source = iobase.ConcatSource([RangeSource(0, 4),
+                                  RangeSource(4, 8),
+                                  RangeSource(8, 12),
+                                  RangeSource(12, 16),
+                                ])
     self.assertEqual(list(source.read(source.get_range_tracker())),
                      range(16))
     self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -187,10 +186,10 @@ class SourcesTest(unittest.TestCase):
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
 
   def test_conact_source_exhaustive(self):
-    source = ConcatSource([RangeSource(0, 10),
-                           RangeSource(100, 110),
-                           RangeSource(1000, 1010),
-                          ])
+    source = iobase.ConcatSource([RangeSource(0, 10),
+                                  RangeSource(100, 110),
+                                  RangeSource(1000, 1010),
+                                ])
     source_test_utils.assertSplitAtFractionExhaustive(source)
 
 if __name__ == '__main__':


[4/6] incubator-beam git commit: Implement liquid sharding for concat source.

Posted by ro...@apache.org.
Implement liquid sharding for concat source.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7d5a37d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7d5a37d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7d5a37d

Branch: refs/heads/python-sdk
Commit: c7d5a37dce37dacd906202191705054032426ba5
Parents: c1f42e6
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:37:06 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py | 166 ++++++++++++++++++---
 sdks/python/apache_beam/io/sources_test.py    |  89 +++++++++++
 2 files changed, 238 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7d5a37d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 5204f04..dd91ef2 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,20 +25,146 @@ for more details.
 For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
+import bisect
 from multiprocessing.pool import ThreadPool
+import threading
 
 from apache_beam.internal import pickler
 from apache_beam.io import fileio
 from apache_beam.io import iobase
-
-import range_trackers
+from apache_beam.io import range_trackers
 
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 
-class _ConcatSource(iobase.BoundedSource):
+class ConcatSource(iobase.BoundedSource):
   """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
 
+  class ConcatRangeTracker(iobase.RangeTracker):
+
+    def __init__(self, start, end, sources, weights=None):
+      super(ConcatSource.ConcatRangeTracker, self).__init__()
+      self._start = start
+      self._end = end
+      self._lock = threading.RLock()
+      self._sources = sources
+      self._range_trackers = [None] * len(self._sources)
+      self._claimed_source_ix = self._start[0]
+
+      if weights is None:
+        n = max(1, end[0] - start[0])
+        self._cumulative_weights = [
+          max(0, min(1, float(k) / n))
+          for k in range(-start[0], len(self._sources) - start[0] + 1)]
+      else:
+        assert len(sources) == len(weights)
+        relevant_weights = weights[start[0]:end[0] + 1]
+        # TODO(robertwb): Implement fraction-at-position to properly scale
+        # partial start and end sources.
+        total = sum(relevant_weights)
+        running_total = [0]
+        for w in relevant_weights:
+          running_total.append(max(1, running_total[-1] + w / total))
+        running_total[-1] = 1  # In case of rounding error.
+        self._cumulative_weights = (
+          [0] * start[0]
+          + running_total
+          + [1] * (len(self._sources) - end[0]))
+
+    def start_position(self):
+      return self._start
+
+    def stop_position(self):
+      return self._end
+
+    def try_claim(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix > self._end[0]:
+          return False
+        elif source_ix == self._end[0] and self._end[1] is None:
+          return False
+        else:
+          self._claimed_source_ix = source_ix
+          if source_pos is None:
+            return True
+          else:
+            return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+    def try_split(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix < self._claimed_source_ix:
+          # Already claimed.
+          return None
+        elif source_ix > self._end[0]:
+          # After end.
+          return None
+        elif source_ix == self._end[0] and self._end[1] is None:
+          # At/after end.
+          return None
+        else:
+          if source_ix > self._claimed_source_ix:
+            # Prefer to split on even boundary.
+            split_pos = None
+            ratio = self._cumulative_weights[source_ix]
+          else:
+            # Split the current subsource.
+            split = self.sub_range_tracker(source_ix).try_split(
+                source_pos)
+            if not split:
+              return None
+            split_pos, frac = split
+            ratio = self.local_to_global(source_ix, frac)
+
+          self._end = source_ix, split_pos
+          self._cumulative_weights = [min(w / ratio, 1)
+                                        for w in self._cumulative_weights]
+          return (source_ix, split_pos), ratio
+
+    def set_current_position(self, pos):
+      raise NotImplementedError('Should only be called on sub-trackers')
+
+    def position_at_fraction(self, fraction):
+      source_ix, source_frac = self.global_to_local(fraction)
+      if source_ix == len(self._sources):
+        return (source_ix, None)
+      else:
+        return (source_ix,
+                self.sub_range_tracker(source_ix).position_at_fraction(
+                    source_frac))
+
+    def fraction_consumed(self):
+      with self._lock:
+        return self.local_to_global(self._claimed_source_ix,
+                                    self.sub_range_tracker(
+                                        self._claimed_source_ix)
+                                        .fraction_consumed())
+
+    def local_to_global(self, source_ix, source_frac):
+      cw = self._cumulative_weights
+      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+    def global_to_local(self, frac):
+      if frac == 1:
+        return (len(self._sources), 0)
+      else:
+        cw = self._cumulative_weights
+        source_ix = bisect.bisect(cw, frac) - 1
+        return (source_ix,
+                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+    def sub_range_tracker(self, source_ix):
+      assert self._start[0] <= source_ix <= self._end[0]
+      if self._range_trackers[source_ix] is None:
+        with self._lock:
+          if self._range_trackers[source_ix] is None:
+            self._range_trackers[source_ix] = (
+                self._sources[source_ix].get_range_tracker(
+                    self._start[1] if source_ix == self._start[0] else None,
+                    self._end[1] if source_ix == self._end[0] else None))
+      return self._range_trackers[source_ix]
+
   def __init__(self, sources):
     self._sources = sources
 
@@ -64,20 +190,23 @@ class _ConcatSource(iobase.BoundedSource):
       for bundle in source.split(desired_bundle_size, None, None):
         yield bundle
 
-  def get_range_tracker(self, start_position, stop_position):
-    assert start_position is None
-    assert stop_position is None
-    # This will be invoked only when FileBasedSource is read without splitting.
-    # For that case, we only support reading the whole source.
-    return range_trackers.OffsetRangeTracker(0, len(self.sources))
+  def get_range_tracker(self, start_position=None, stop_position=None):
+    if start_position is None:
+      start_position = (0, None)
+    if stop_position is None:
+      stop_position = (len(self._sources), None)
+    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
 
   def read(self, range_tracker):
-    for index, sub_source in enumerate(self.sources):
-      if not range_tracker.try_claim(index):
-        return
-
-      sub_source_tracker = sub_source.get_range_tracker(None, None)
-      for record in sub_source.read(sub_source_tracker):
+    start_source, _ = range_tracker.start_position()
+    stop_source, stop_pos = range_tracker.stop_position()
+    if stop_pos is not None:
+      stop_source += 1
+    for source_ix in range(start_source, stop_source):
+      if not range_tracker.try_claim((source_ix, None)):
+        break
+      for record in self._sources[source_ix].read(
+          range_tracker.sub_range_tracker(source_ix)):
         yield record
 
   def default_output_coder(self):
@@ -87,7 +216,10 @@ class _ConcatSource(iobase.BoundedSource):
       return self._sources[0].default_output_coder()
     else:
       # Defaulting to PickleCoder.
-      return super(_ConcatSource, self).default_output_coder()
+      return super(ConcatSource, self).default_output_coder()
+
+
+_ConcatSource = ConcatSource
 
 
 class FileBasedSource(iobase.BoundedSource):
@@ -162,7 +294,7 @@ class FileBasedSource(iobase.BoundedSource):
             sizes[index],
             min_bundle_size=self._min_bundle_size)
         single_file_sources.append(single_file_source)
-      self._concat_source = _ConcatSource(single_file_sources)
+      self._concat_source = ConcatSource(single_file_sources)
     return self._concat_source
 
   def open_file(self, file_name):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7d5a37d/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 293f437..3f83d80 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,9 @@ import apache_beam as beam
 
 from apache_beam import coders
 from apache_beam.io import iobase
+from apache_beam.io.filebasedsource import ConcatSource
 from apache_beam.io import range_trackers
+from apache_beam.io import source_test_utils
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -80,6 +82,45 @@ class LineSource(iobase.BoundedSource):
     return coders.BytesCoder()
 
 
+class RangeSource(iobase.BoundedSource):
+
+  def __init__(self, start, end, split_freq=1):
+    assert start <= end
+    self._start = start
+    self._end = end
+    self._split_freq = split_freq
+
+  def _normalize(self, start_position, end_position):
+    return (self._start if start_position is None else start_position,
+            self._end if end_position is None else end_position)
+
+  def _round_up(self, index):
+    """Rounds up to the nearest mulitple of split_freq."""
+    return index - index % -self._split_freq
+
+  def estimate_size(self):
+    return self._stop - self._start
+
+  def split(self, desired_bundle_size, start_position=None, end_position=None):
+    start, end = self._normalize(start_position, end_position)
+    for sub_start in range(start, end, desired_bundle_size):
+      sub_end = min(self._end, sub_start + desired_bundle_size)
+      yield SourceBundle(RangeSource(sub_start, sub_end, self._split_freq),
+                         sub_end - sub_start)
+
+  def get_range_tracker(self, start_position, end_position):
+    start, end = self._normalize(start_position, end_position)
+    return range_trackers.OffsetRangeTracker(start, end)
+
+  def read(self, range_tracker):
+    for k in range(self._round_up(range_tracker.start_position()),
+                   self._round_up(range_tracker.stop_position())):
+      if k % self._split_freq == 0:
+        if not range_tracker.try_claim(k):
+          return
+      yield k
+
+
 class SourcesTest(unittest.TestCase):
 
   def _create_temp_file(self, contents):
@@ -104,6 +145,54 @@ class SourcesTest(unittest.TestCase):
 
     pipeline.run()
 
+  def test_range_source(self):
+    source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+
+  def test_conact_source(self):
+    source = ConcatSource([RangeSource(0, 4),
+                           RangeSource(4, 8),
+                           RangeSource(8, 12),
+                           RangeSource(12, 16),
+                          ])
+    self.assertEqual(list(source.read(source.get_range_tracker())),
+                     range(16))
+    self.assertEqual(list(source.read(source.get_range_tracker((1, None),
+                                                               (2, 10)))),
+                     range(4, 10))
+    range_tracker = source.get_range_tracker(None, None)
+    self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
+    self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
+    self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
+
+    # Simulate a read.
+    self.assertEqual(range_tracker.try_claim((0, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
+    self.assertEqual(range_tracker.fraction_consumed(), 0.125)
+
+    self.assertEqual(range_tracker.try_claim((1, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
+    self.assertEqual(range_tracker.fraction_consumed(), 0.375)
+    self.assertEqual(range_tracker.try_split((0, 1)), None)
+    self.assertEqual(range_tracker.try_split((1, 5)), None)
+
+    self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
+    self.assertEqual(range_tracker.try_claim((3, None)), False)
+    self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
+    self.assertEqual(range_tracker.try_claim((2, None)), True)
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
+
+    self.assertEqual(range_tracker.try_split((2, 8)), None)
+    self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
+    self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+
+  def test_conact_source_exhaustive(self):
+    source = ConcatSource([RangeSource(0, 10),
+                           RangeSource(100, 110),
+                           RangeSource(1000, 1010),
+                          ])
+    source_test_utils.assertSplitAtFractionExhaustive(source)
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()