You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/23 20:54:34 UTC

[5/6] incubator-beam git commit: Move ConcatSource to iobase.

Move ConcatSource to iobase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d65d17aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d65d17aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d65d17aa

Branch: refs/heads/python-sdk
Commit: d65d17aa1e5ecbeed20cbcf0edbe11280acf0262
Parents: c7d5a37
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:49:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   | 189 +------------------
 .../apache_beam/io/filebasedsource_test.py      |   2 +-
 sdks/python/apache_beam/io/iobase.py            | 184 ++++++++++++++++++
 sdks/python/apache_beam/io/sources_test.py      |  19 +-
 4 files changed, 195 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index dd91ef2..2911b9f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,9 +25,7 @@ for more details.
 For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
-import bisect
 from multiprocessing.pool import ThreadPool
-import threading
 
 from apache_beam.internal import pickler
 from apache_beam.io import fileio
@@ -37,191 +35,6 @@ from apache_beam.io import range_trackers
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 
-class ConcatSource(iobase.BoundedSource):
-  """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
-  class ConcatRangeTracker(iobase.RangeTracker):
-
-    def __init__(self, start, end, sources, weights=None):
-      super(ConcatSource.ConcatRangeTracker, self).__init__()
-      self._start = start
-      self._end = end
-      self._lock = threading.RLock()
-      self._sources = sources
-      self._range_trackers = [None] * len(self._sources)
-      self._claimed_source_ix = self._start[0]
-
-      if weights is None:
-        n = max(1, end[0] - start[0])
-        self._cumulative_weights = [
-          max(0, min(1, float(k) / n))
-          for k in range(-start[0], len(self._sources) - start[0] + 1)]
-      else:
-        assert len(sources) == len(weights)
-        relevant_weights = weights[start[0]:end[0] + 1]
-        # TODO(robertwb): Implement fraction-at-position to properly scale
-        # partial start and end sources.
-        total = sum(relevant_weights)
-        running_total = [0]
-        for w in relevant_weights:
-          running_total.append(max(1, running_total[-1] + w / total))
-        running_total[-1] = 1  # In case of rounding error.
-        self._cumulative_weights = (
-          [0] * start[0]
-          + running_total
-          + [1] * (len(self._sources) - end[0]))
-
-    def start_position(self):
-      return self._start
-
-    def stop_position(self):
-      return self._end
-
-    def try_claim(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix > self._end[0]:
-          return False
-        elif source_ix == self._end[0] and self._end[1] is None:
-          return False
-        else:
-          self._claimed_source_ix = source_ix
-          if source_pos is None:
-            return True
-          else:
-            return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
-    def try_split(self, pos):
-      source_ix, source_pos = pos
-      with self._lock:
-        if source_ix < self._claimed_source_ix:
-          # Already claimed.
-          return None
-        elif source_ix > self._end[0]:
-          # After end.
-          return None
-        elif source_ix == self._end[0] and self._end[1] is None:
-          # At/after end.
-          return None
-        else:
-          if source_ix > self._claimed_source_ix:
-            # Prefer to split on even boundary.
-            split_pos = None
-            ratio = self._cumulative_weights[source_ix]
-          else:
-            # Split the current subsource.
-            split = self.sub_range_tracker(source_ix).try_split(
-                source_pos)
-            if not split:
-              return None
-            split_pos, frac = split
-            ratio = self.local_to_global(source_ix, frac)
-
-          self._end = source_ix, split_pos
-          self._cumulative_weights = [min(w / ratio, 1)
-                                        for w in self._cumulative_weights]
-          return (source_ix, split_pos), ratio
-
-    def set_current_position(self, pos):
-      raise NotImplementedError('Should only be called on sub-trackers')
-
-    def position_at_fraction(self, fraction):
-      source_ix, source_frac = self.global_to_local(fraction)
-      if source_ix == len(self._sources):
-        return (source_ix, None)
-      else:
-        return (source_ix,
-                self.sub_range_tracker(source_ix).position_at_fraction(
-                    source_frac))
-
-    def fraction_consumed(self):
-      with self._lock:
-        return self.local_to_global(self._claimed_source_ix,
-                                    self.sub_range_tracker(
-                                        self._claimed_source_ix)
-                                        .fraction_consumed())
-
-    def local_to_global(self, source_ix, source_frac):
-      cw = self._cumulative_weights
-      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
-    def global_to_local(self, frac):
-      if frac == 1:
-        return (len(self._sources), 0)
-      else:
-        cw = self._cumulative_weights
-        source_ix = bisect.bisect(cw, frac) - 1
-        return (source_ix,
-                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
-    def sub_range_tracker(self, source_ix):
-      assert self._start[0] <= source_ix <= self._end[0]
-      if self._range_trackers[source_ix] is None:
-        with self._lock:
-          if self._range_trackers[source_ix] is None:
-            self._range_trackers[source_ix] = (
-                self._sources[source_ix].get_range_tracker(
-                    self._start[1] if source_ix == self._start[0] else None,
-                    self._end[1] if source_ix == self._end[0] else None))
-      return self._range_trackers[source_ix]
-
-  def __init__(self, sources):
-    self._sources = sources
-
-  @property
-  def sources(self):
-    return self._sources
-
-  def estimate_size(self):
-    return sum(s.estimate_size() for s in self._sources)
-
-  def split(
-      self, desired_bundle_size=None, start_position=None, stop_position=None):
-    if start_position or stop_position:
-      raise ValueError(
-          'Multi-level initial splitting is not supported. Expected start and '
-          'stop positions to be None. Received %r and %r respectively.' %
-          (start_position, stop_position))
-
-    for source in self._sources:
-      # We assume all sub-sources to produce bundles that specify weight using
-      # the same unit. For example, all sub-sources may specify the size in
-      # bytes as their weight.
-      for bundle in source.split(desired_bundle_size, None, None):
-        yield bundle
-
-  def get_range_tracker(self, start_position=None, stop_position=None):
-    if start_position is None:
-      start_position = (0, None)
-    if stop_position is None:
-      stop_position = (len(self._sources), None)
-    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
-
-  def read(self, range_tracker):
-    start_source, _ = range_tracker.start_position()
-    stop_source, stop_pos = range_tracker.stop_position()
-    if stop_pos is not None:
-      stop_source += 1
-    for source_ix in range(start_source, stop_source):
-      if not range_tracker.try_claim((source_ix, None)):
-        break
-      for record in self._sources[source_ix].read(
-          range_tracker.sub_range_tracker(source_ix)):
-        yield record
-
-  def default_output_coder(self):
-    if self._sources:
-      # Getting coder from the first sub-sources. This assumes all sub-sources
-      # to produce the same coder.
-      return self._sources[0].default_output_coder()
-    else:
-      # Defaulting to PickleCoder.
-      return super(ConcatSource, self).default_output_coder()
-
-
-_ConcatSource = ConcatSource
-
-
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""
 
@@ -294,7 +107,7 @@ class FileBasedSource(iobase.BoundedSource):
             sizes[index],
             min_bundle_size=self._min_bundle_size)
         single_file_sources.append(single_file_source)
-      self._concat_source = ConcatSource(single_file_sources)
+      self._concat_source = iobase.ConcatSource(single_file_sources)
     return self._concat_source
 
   def open_file(self, file_name):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 50f0a22..3efcc16 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 
 # importing following private classes for testing
-from apache_beam.io.filebasedsource import _ConcatSource as ConcatSource
+from apache_beam.io.iobase import ConcatSource
 from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
 
 from apache_beam.io.filebasedsource import FileBasedSource

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..1205a26 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,8 +31,10 @@ the sink.
 
 from collections import namedtuple
 
+import bisect
 import logging
 import random
+import threading
 import uuid
 
 from apache_beam import pvalue
@@ -1097,3 +1099,185 @@ class _RoundRobinKeyFn(core.DoFn):
     if self.counter >= self.count:
       self.counter -= self.count
     yield self.counter, context.element
+
+
+class ConcatSource(BoundedSource):
+  """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
+
+  class ConcatRangeTracker(RangeTracker):
+
+    def __init__(self, start, end, sources, weights=None):
+      super(ConcatSource.ConcatRangeTracker, self).__init__()
+      self._start = start
+      self._end = end
+      self._lock = threading.RLock()
+      self._sources = sources
+      self._range_trackers = [None] * len(self._sources)
+      self._claimed_source_ix = self._start[0]
+
+      if weights is None:
+        n = max(1, end[0] - start[0])
+        self._cumulative_weights = [
+          max(0, min(1, float(k) / n))
+          for k in range(-start[0], len(self._sources) - start[0] + 1)]
+      else:
+        assert len(sources) == len(weights)
+        relevant_weights = weights[start[0]:end[0] + 1]
+        # TODO(robertwb): Implement fraction-at-position to properly scale
+        # partial start and end sources.
+        total = sum(relevant_weights)
+        running_total = [0]
+        for w in relevant_weights:
+          running_total.append(max(1, running_total[-1] + w / total))
+        running_total[-1] = 1  # In case of rounding error.
+        self._cumulative_weights = (
+          [0] * start[0]
+          + running_total
+          + [1] * (len(self._sources) - end[0]))
+
+    def start_position(self):
+      return self._start
+
+    def stop_position(self):
+      return self._end
+
+    def try_claim(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix > self._end[0]:
+          return False
+        elif source_ix == self._end[0] and self._end[1] is None:
+          return False
+        else:
+          self._claimed_source_ix = source_ix
+          if source_pos is None:
+            return True
+          else:
+            return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+    def try_split(self, pos):
+      source_ix, source_pos = pos
+      with self._lock:
+        if source_ix < self._claimed_source_ix:
+          # Already claimed.
+          return None
+        elif source_ix > self._end[0]:
+          # After end.
+          return None
+        elif source_ix == self._end[0] and self._end[1] is None:
+          # At/after end.
+          return None
+        else:
+          if source_ix > self._claimed_source_ix:
+            # Prefer to split on even boundary.
+            split_pos = None
+            ratio = self._cumulative_weights[source_ix]
+          else:
+            # Split the current subsource.
+            split = self.sub_range_tracker(source_ix).try_split(
+                source_pos)
+            if not split:
+              return None
+            split_pos, frac = split
+            ratio = self.local_to_global(source_ix, frac)
+
+          self._end = source_ix, split_pos
+          self._cumulative_weights = [min(w / ratio, 1)
+                                        for w in self._cumulative_weights]
+          return (source_ix, split_pos), ratio
+
+    def set_current_position(self, pos):
+      raise NotImplementedError('Should only be called on sub-trackers')
+
+    def position_at_fraction(self, fraction):
+      source_ix, source_frac = self.global_to_local(fraction)
+      if source_ix == len(self._sources):
+        return (source_ix, None)
+      else:
+        return (source_ix,
+                self.sub_range_tracker(source_ix).position_at_fraction(
+                    source_frac))
+
+    def fraction_consumed(self):
+      with self._lock:
+        return self.local_to_global(self._claimed_source_ix,
+                                    self.sub_range_tracker(
+                                        self._claimed_source_ix)
+                                        .fraction_consumed())
+
+    def local_to_global(self, source_ix, source_frac):
+      cw = self._cumulative_weights
+      return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+    def global_to_local(self, frac):
+      if frac == 1:
+        return (len(self._sources), 0)
+      else:
+        cw = self._cumulative_weights
+        source_ix = bisect.bisect(cw, frac) - 1
+        return (source_ix,
+                (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+    def sub_range_tracker(self, source_ix):
+      assert self._start[0] <= source_ix <= self._end[0]
+      if self._range_trackers[source_ix] is None:
+        with self._lock:
+          if self._range_trackers[source_ix] is None:
+            self._range_trackers[source_ix] = (
+                self._sources[source_ix].get_range_tracker(
+                    self._start[1] if source_ix == self._start[0] else None,
+                    self._end[1] if source_ix == self._end[0] else None))
+      return self._range_trackers[source_ix]
+
+  def __init__(self, sources):
+    self._sources = sources
+
+  @property
+  def sources(self):
+    return self._sources
+
+  def estimate_size(self):
+    return sum(s.estimate_size() for s in self._sources)
+
+  def split(
+      self, desired_bundle_size=None, start_position=None, stop_position=None):
+    if start_position or stop_position:
+      raise ValueError(
+          'Multi-level initial splitting is not supported. Expected start and '
+          'stop positions to be None. Received %r and %r respectively.' %
+          (start_position, stop_position))
+
+    for source in self._sources:
+      # We assume all sub-sources to produce bundles that specify weight using
+      # the same unit. For example, all sub-sources may specify the size in
+      # bytes as their weight.
+      for bundle in source.split(desired_bundle_size, None, None):
+        yield bundle
+
+  def get_range_tracker(self, start_position=None, stop_position=None):
+    if start_position is None:
+      start_position = (0, None)
+    if stop_position is None:
+      stop_position = (len(self._sources), None)
+    return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+
+  def read(self, range_tracker):
+    start_source, _ = range_tracker.start_position()
+    stop_source, stop_pos = range_tracker.stop_position()
+    if stop_pos is not None:
+      stop_source += 1
+    for source_ix in range(start_source, stop_source):
+      if not range_tracker.try_claim((source_ix, None)):
+        break
+      for record in self._sources[source_ix].read(
+          range_tracker.sub_range_tracker(source_ix)):
+        yield record
+
+  def default_output_coder(self):
+    if self._sources:
+      # Getting coder from the first sub-sources. This assumes all sub-sources
+      # to produce the same coder.
+      return self._sources[0].default_output_coder()
+    else:
+      # Defaulting to PickleCoder.
+      return super(ConcatSource, self).default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f83d80..4d16560 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,6 @@ import apache_beam as beam
 
 from apache_beam import coders
 from apache_beam.io import iobase
-from apache_beam.io.filebasedsource import ConcatSource
 from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.transforms.util import assert_that
@@ -149,11 +148,11 @@ class SourcesTest(unittest.TestCase):
     source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
 
   def test_conact_source(self):
-    source = ConcatSource([RangeSource(0, 4),
-                           RangeSource(4, 8),
-                           RangeSource(8, 12),
-                           RangeSource(12, 16),
-                          ])
+    source = iobase.ConcatSource([RangeSource(0, 4),
+                                  RangeSource(4, 8),
+                                  RangeSource(8, 12),
+                                  RangeSource(12, 16),
+                                ])
     self.assertEqual(list(source.read(source.get_range_tracker())),
                      range(16))
     self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -187,10 +186,10 @@ class SourcesTest(unittest.TestCase):
     self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
 
   def test_conact_source_exhaustive(self):
-    source = ConcatSource([RangeSource(0, 10),
-                           RangeSource(100, 110),
-                           RangeSource(1000, 1010),
-                          ])
+    source = iobase.ConcatSource([RangeSource(0, 10),
+                                  RangeSource(100, 110),
+                                  RangeSource(1000, 1010),
+                                ])
     source_test_utils.assertSplitAtFractionExhaustive(source)
 
 if __name__ == '__main__':