You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/23 20:54:34 UTC
[5/6] incubator-beam git commit: Move ConcatSource to iobase.
Move ConcatSource to iobase.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d65d17aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d65d17aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d65d17aa
Branch: refs/heads/python-sdk
Commit: d65d17aa1e5ecbeed20cbcf0edbe11280acf0262
Parents: c7d5a37
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:49:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 189 +------------------
.../apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/iobase.py | 184 ++++++++++++++++++
sdks/python/apache_beam/io/sources_test.py | 19 +-
4 files changed, 195 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index dd91ef2..2911b9f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,9 +25,7 @@ for more details.
For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
-import bisect
from multiprocessing.pool import ThreadPool
-import threading
from apache_beam.internal import pickler
from apache_beam.io import fileio
@@ -37,191 +35,6 @@ from apache_beam.io import range_trackers
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
-class ConcatSource(iobase.BoundedSource):
- """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
- class ConcatRangeTracker(iobase.RangeTracker):
-
- def __init__(self, start, end, sources, weights=None):
- super(ConcatSource.ConcatRangeTracker, self).__init__()
- self._start = start
- self._end = end
- self._lock = threading.RLock()
- self._sources = sources
- self._range_trackers = [None] * len(self._sources)
- self._claimed_source_ix = self._start[0]
-
- if weights is None:
- n = max(1, end[0] - start[0])
- self._cumulative_weights = [
- max(0, min(1, float(k) / n))
- for k in range(-start[0], len(self._sources) - start[0] + 1)]
- else:
- assert len(sources) == len(weights)
- relevant_weights = weights[start[0]:end[0] + 1]
- # TODO(robertwb): Implement fraction-at-position to properly scale
- # partial start and end sources.
- total = sum(relevant_weights)
- running_total = [0]
- for w in relevant_weights:
- running_total.append(max(1, running_total[-1] + w / total))
- running_total[-1] = 1 # In case of rounding error.
- self._cumulative_weights = (
- [0] * start[0]
- + running_total
- + [1] * (len(self._sources) - end[0]))
-
- def start_position(self):
- return self._start
-
- def stop_position(self):
- return self._end
-
- def try_claim(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix > self._end[0]:
- return False
- elif source_ix == self._end[0] and self._end[1] is None:
- return False
- else:
- self._claimed_source_ix = source_ix
- if source_pos is None:
- return True
- else:
- return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
- def try_split(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix < self._claimed_source_ix:
- # Already claimed.
- return None
- elif source_ix > self._end[0]:
- # After end.
- return None
- elif source_ix == self._end[0] and self._end[1] is None:
- # At/after end.
- return None
- else:
- if source_ix > self._claimed_source_ix:
- # Prefer to split on even boundary.
- split_pos = None
- ratio = self._cumulative_weights[source_ix]
- else:
- # Split the current subsource.
- split = self.sub_range_tracker(source_ix).try_split(
- source_pos)
- if not split:
- return None
- split_pos, frac = split
- ratio = self.local_to_global(source_ix, frac)
-
- self._end = source_ix, split_pos
- self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
- return (source_ix, split_pos), ratio
-
- def set_current_position(self, pos):
- raise NotImplementedError('Should only be called on sub-trackers')
-
- def position_at_fraction(self, fraction):
- source_ix, source_frac = self.global_to_local(fraction)
- if source_ix == len(self._sources):
- return (source_ix, None)
- else:
- return (source_ix,
- self.sub_range_tracker(source_ix).position_at_fraction(
- source_frac))
-
- def fraction_consumed(self):
- with self._lock:
- return self.local_to_global(self._claimed_source_ix,
- self.sub_range_tracker(
- self._claimed_source_ix)
- .fraction_consumed())
-
- def local_to_global(self, source_ix, source_frac):
- cw = self._cumulative_weights
- return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
- def global_to_local(self, frac):
- if frac == 1:
- return (len(self._sources), 0)
- else:
- cw = self._cumulative_weights
- source_ix = bisect.bisect(cw, frac) - 1
- return (source_ix,
- (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
- def sub_range_tracker(self, source_ix):
- assert self._start[0] <= source_ix <= self._end[0]
- if self._range_trackers[source_ix] is None:
- with self._lock:
- if self._range_trackers[source_ix] is None:
- self._range_trackers[source_ix] = (
- self._sources[source_ix].get_range_tracker(
- self._start[1] if source_ix == self._start[0] else None,
- self._end[1] if source_ix == self._end[0] else None))
- return self._range_trackers[source_ix]
-
- def __init__(self, sources):
- self._sources = sources
-
- @property
- def sources(self):
- return self._sources
-
- def estimate_size(self):
- return sum(s.estimate_size() for s in self._sources)
-
- def split(
- self, desired_bundle_size=None, start_position=None, stop_position=None):
- if start_position or stop_position:
- raise ValueError(
- 'Multi-level initial splitting is not supported. Expected start and '
- 'stop positions to be None. Received %r and %r respectively.' %
- (start_position, stop_position))
-
- for source in self._sources:
- # We assume all sub-sources to produce bundles that specify weight using
- # the same unit. For example, all sub-sources may specify the size in
- # bytes as their weight.
- for bundle in source.split(desired_bundle_size, None, None):
- yield bundle
-
- def get_range_tracker(self, start_position=None, stop_position=None):
- if start_position is None:
- start_position = (0, None)
- if stop_position is None:
- stop_position = (len(self._sources), None)
- return self.ConcatRangeTracker(start_position, stop_position, self._sources)
-
- def read(self, range_tracker):
- start_source, _ = range_tracker.start_position()
- stop_source, stop_pos = range_tracker.stop_position()
- if stop_pos is not None:
- stop_source += 1
- for source_ix in range(start_source, stop_source):
- if not range_tracker.try_claim((source_ix, None)):
- break
- for record in self._sources[source_ix].read(
- range_tracker.sub_range_tracker(source_ix)):
- yield record
-
- def default_output_coder(self):
- if self._sources:
- # Getting coder from the first sub-sources. This assumes all sub-sources
- # to produce the same coder.
- return self._sources[0].default_output_coder()
- else:
- # Defaulting to PickleCoder.
- return super(ConcatSource, self).default_output_coder()
-
-
-_ConcatSource = ConcatSource
-
-
class FileBasedSource(iobase.BoundedSource):
"""A ``BoundedSource`` for reading a file glob of a given type."""
@@ -294,7 +107,7 @@ class FileBasedSource(iobase.BoundedSource):
sizes[index],
min_bundle_size=self._min_bundle_size)
single_file_sources.append(single_file_source)
- self._concat_source = ConcatSource(single_file_sources)
+ self._concat_source = iobase.ConcatSource(single_file_sources)
return self._concat_source
def open_file(self, file_name):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 50f0a22..3efcc16 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
# importing following private classes for testing
-from apache_beam.io.filebasedsource import _ConcatSource as ConcatSource
+from apache_beam.io.iobase import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..1205a26 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,8 +31,10 @@ the sink.
from collections import namedtuple
+import bisect
import logging
import random
+import threading
import uuid
from apache_beam import pvalue
@@ -1097,3 +1099,185 @@ class _RoundRobinKeyFn(core.DoFn):
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, context.element
+
+
+class ConcatSource(BoundedSource):
+ """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
+
+ class ConcatRangeTracker(RangeTracker):
+
+ def __init__(self, start, end, sources, weights=None):
+ super(ConcatSource.ConcatRangeTracker, self).__init__()
+ self._start = start
+ self._end = end
+ self._lock = threading.RLock()
+ self._sources = sources
+ self._range_trackers = [None] * len(self._sources)
+ self._claimed_source_ix = self._start[0]
+
+ if weights is None:
+ n = max(1, end[0] - start[0])
+ self._cumulative_weights = [
+ max(0, min(1, float(k) / n))
+ for k in range(-start[0], len(self._sources) - start[0] + 1)]
+ else:
+ assert len(sources) == len(weights)
+ relevant_weights = weights[start[0]:end[0] + 1]
+ # TODO(robertwb): Implement fraction-at-position to properly scale
+ # partial start and end sources.
+ total = sum(relevant_weights)
+ running_total = [0]
+ for w in relevant_weights:
+ running_total.append(max(1, running_total[-1] + w / total))
+ running_total[-1] = 1 # In case of rounding error.
+ self._cumulative_weights = (
+ [0] * start[0]
+ + running_total
+ + [1] * (len(self._sources) - end[0]))
+
+ def start_position(self):
+ return self._start
+
+ def stop_position(self):
+ return self._end
+
+ def try_claim(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix > self._end[0]:
+ return False
+ elif source_ix == self._end[0] and self._end[1] is None:
+ return False
+ else:
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ else:
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+ def try_split(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix < self._claimed_source_ix:
+ # Already claimed.
+ return None
+ elif source_ix > self._end[0]:
+ # After end.
+ return None
+ elif source_ix == self._end[0] and self._end[1] is None:
+ # At/after end.
+ return None
+ else:
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
+ else:
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
+
+ def set_current_position(self, pos):
+ raise NotImplementedError('Should only be called on sub-trackers')
+
+ def position_at_fraction(self, fraction):
+ source_ix, source_frac = self.global_to_local(fraction)
+ if source_ix == len(self._sources):
+ return (source_ix, None)
+ else:
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
+
+ def fraction_consumed(self):
+ with self._lock:
+ return self.local_to_global(self._claimed_source_ix,
+ self.sub_range_tracker(
+ self._claimed_source_ix)
+ .fraction_consumed())
+
+ def local_to_global(self, source_ix, source_frac):
+ cw = self._cumulative_weights
+ return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+ def global_to_local(self, frac):
+ if frac == 1:
+ return (len(self._sources), 0)
+ else:
+ cw = self._cumulative_weights
+ source_ix = bisect.bisect(cw, frac) - 1
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+ def sub_range_tracker(self, source_ix):
+ assert self._start[0] <= source_ix <= self._end[0]
+ if self._range_trackers[source_ix] is None:
+ with self._lock:
+ if self._range_trackers[source_ix] is None:
+ self._range_trackers[source_ix] = (
+ self._sources[source_ix].get_range_tracker(
+ self._start[1] if source_ix == self._start[0] else None,
+ self._end[1] if source_ix == self._end[0] else None))
+ return self._range_trackers[source_ix]
+
+ def __init__(self, sources):
+ self._sources = sources
+
+ @property
+ def sources(self):
+ return self._sources
+
+ def estimate_size(self):
+ return sum(s.estimate_size() for s in self._sources)
+
+ def split(
+ self, desired_bundle_size=None, start_position=None, stop_position=None):
+ if start_position or stop_position:
+ raise ValueError(
+ 'Multi-level initial splitting is not supported. Expected start and '
+ 'stop positions to be None. Received %r and %r respectively.' %
+ (start_position, stop_position))
+
+ for source in self._sources:
+ # We assume all sub-sources to produce bundles that specify weight using
+ # the same unit. For example, all sub-sources may specify the size in
+ # bytes as their weight.
+ for bundle in source.split(desired_bundle_size, None, None):
+ yield bundle
+
+ def get_range_tracker(self, start_position=None, stop_position=None):
+ if start_position is None:
+ start_position = (0, None)
+ if stop_position is None:
+ stop_position = (len(self._sources), None)
+ return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+
+ def read(self, range_tracker):
+ start_source, _ = range_tracker.start_position()
+ stop_source, stop_pos = range_tracker.stop_position()
+ if stop_pos is not None:
+ stop_source += 1
+ for source_ix in range(start_source, stop_source):
+ if not range_tracker.try_claim((source_ix, None)):
+ break
+ for record in self._sources[source_ix].read(
+ range_tracker.sub_range_tracker(source_ix)):
+ yield record
+
+ def default_output_coder(self):
+ if self._sources:
+ # Getting coder from the first sub-sources. This assumes all sub-sources
+ # to produce the same coder.
+ return self._sources[0].default_output_coder()
+ else:
+ # Defaulting to PickleCoder.
+ return super(ConcatSource, self).default_output_coder()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f83d80..4d16560 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,6 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
-from apache_beam.io.filebasedsource import ConcatSource
from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.transforms.util import assert_that
@@ -149,11 +148,11 @@ class SourcesTest(unittest.TestCase):
source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
def test_conact_source(self):
- source = ConcatSource([RangeSource(0, 4),
- RangeSource(4, 8),
- RangeSource(8, 12),
- RangeSource(12, 16),
- ])
+ source = iobase.ConcatSource([RangeSource(0, 4),
+ RangeSource(4, 8),
+ RangeSource(8, 12),
+ RangeSource(12, 16),
+ ])
self.assertEqual(list(source.read(source.get_range_tracker())),
range(16))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -187,10 +186,10 @@ class SourcesTest(unittest.TestCase):
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
def test_conact_source_exhaustive(self):
- source = ConcatSource([RangeSource(0, 10),
- RangeSource(100, 110),
- RangeSource(1000, 1010),
- ])
+ source = iobase.ConcatSource([RangeSource(0, 10),
+ RangeSource(100, 110),
+ RangeSource(1000, 1010),
+ ])
source_test_utils.assertSplitAtFractionExhaustive(source)
if __name__ == '__main__':