You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/23 20:54:30 UTC
[1/6] incubator-beam git commit: Move ConcatSource into its own
module.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 0fa9c4be6 -> 753cc9c2e
Move ConcatSource into its own module.
Also added some more tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d189f83e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d189f83e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d189f83e
Branch: refs/heads/python-sdk
Commit: d189f83e4fb89e7cdc36e267bdf4d30c35bb03e8
Parents: 0ffe7cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Sep 22 15:34:04 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 264 +++++++++++++++++++
.../python/apache_beam/io/concat_source_test.py | 222 ++++++++++++++++
sdks/python/apache_beam/io/filebasedsource.py | 3 +-
.../apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/iobase.py | 234 ----------------
sdks/python/apache_beam/io/source_test_utils.py | 2 +-
sdks/python/apache_beam/io/sources_test.py | 100 -------
7 files changed, 490 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
new file mode 100644
index 0000000..f2bd238
--- /dev/null
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -0,0 +1,264 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Concat Source, which reads the union of several other sources.
+"""
+
+import bisect
+import threading
+
+from apache_beam.io import iobase
+
+
+class ConcatSource(iobase.BoundedSource):
+ """A ``BoundedSource`` that can group a set of ``BoundedSources``.
+
+ Primarily for internal use, use the ``apache_beam.Flatten`` transform
+ to create the union of several reads.
+ """
+
+ def __init__(self, sources):
+ self._source_bundles = [source if isinstance(source, iobase.SourceBundle)
+ else iobase.SourceBundle(None, source, None, None)
+ for source in sources]
+
+ @property
+ def sources(self):
+ return [s.source for s in self._source_bundles]
+
+ def estimate_size(self):
+ return sum(s.source.estimate_size() for s in self._source_bundles)
+
+ def split(
+ self, desired_bundle_size=None, start_position=None, stop_position=None):
+ if start_position or stop_position:
+ raise ValueError(
+ 'Multi-level initial splitting is not supported. Expected start and '
+ 'stop positions to be None. Received %r and %r respectively.' %
+ (start_position, stop_position))
+
+ for source in self._source_bundles:
+ # We assume all sub-sources to produce bundles that specify weight using
+ # the same unit. For example, all sub-sources may specify the size in
+ # bytes as their weight.
+ for bundle in source.source.split(
+ desired_bundle_size, source.start_position, source.stop_position):
+ yield bundle
+
+ def get_range_tracker(self, start_position=None, stop_position=None):
+ if start_position is None:
+ start_position = (0, None)
+ if stop_position is None:
+ stop_position = (len(self._source_bundles), None)
+ return ConcatRangeTracker(
+ start_position, stop_position, self._source_bundles)
+
+ def read(self, range_tracker):
+ start_source, _ = range_tracker.start_position()
+ stop_source, stop_pos = range_tracker.stop_position()
+ if stop_pos is not None:
+ stop_source += 1
+ for source_ix in range(start_source, stop_source):
+ if not range_tracker.try_claim((source_ix, None)):
+ break
+ for record in self._source_bundles[source_ix].source.read(
+ range_tracker.sub_range_tracker(source_ix)):
+ yield record
+
+ def default_output_coder(self):
+ if self._source_bundles:
+ # Getting coder from the first sub-sources. This assumes all sub-sources
+ # to produce the same coder.
+ return self._source_bundles[0].source.default_output_coder()
+ else:
+ # Defaulting to PickleCoder.
+ return super(ConcatSource, self).default_output_coder()
+
+
+class ConcatRangeTracker(iobase.RangeTracker):
+ """Range tracker for ConcatSource"""
+
+ def __init__(self, start, end, source_bundles):
+ """Initializes ``ConcatRangeTracker``
+
+ Args:
+ start: start position, a tuple of (source_index, source_position)
+ end: end position, a tuple of (source_index, source_position)
+ source_bundles: the list of source bundles in the ConcatSource
+ """
+ super(ConcatRangeTracker, self).__init__()
+ self._start = start
+ self._end = end
+ self._source_bundles = source_bundles
+ self._lock = threading.RLock()
+ # Lazily-initialized list of RangeTrackers corresponding to each source.
+ self._range_trackers = [None] * len(source_bundles)
+ # The currently-being-iterated-over (and latest claimed) source.
+ self._claimed_source_ix = self._start[0]
+ # Now compute cumulative progress through the sources for converting
+ # between global fractions and fractions within specific sources.
+ # TODO(robertwb): Implement fraction-at-position to properly scale
+ # partial start and end sources.
+ # Note, however, that in practice splits are typically on source
+ # boundaries anyways.
+ last = end[0] if end[1] is None else end[0] + 1
+ self._cumulative_weights = (
+ [0] * start[0]
+ + self._compute_cumulative_weights(source_bundles[start[0]:last])
+ + [1] * (len(source_bundles) - last - start[0]))
+
+ @staticmethod
+ def _compute_cumulative_weights(source_bundles):
+ # Two adjacent sources must differ so that they can be uniquely
+ # identified by a single global fraction. Let min_diff be the
+ # smallest allowable difference between sources.
+ min_diff = 1e-5
+ # For the computation below, we need weights for all sources.
+ # Substitute average weights for those whose weights are
+ # unspecified (or 1.0 for everything if none are known).
+ known = [s.weight for s in source_bundles if s.weight is not None]
+ avg = sum(known) / len(known) if known else 1.0
+ weights = [s.weight or avg for s in source_bundles]
+
+ # Now compute running totals of the percent done upon reaching
+ # each source, with respect to the start and end positions.
+ # E.g. if the weights were [100, 20, 3] we would produce
+ # [0.0, 100/123, 120/123, 1.0]
+ total = float(sum(weights))
+ running_total = [0]
+ for w in weights:
+ running_total.append(
+ max(min_diff, min(1, running_total[-1] + w / total)))
+ running_total[-1] = 1 # In case of rounding error.
+ # There are issues if, due to rouding error or greatly differing sizes,
+ # two adjacent running total weights are equal. Normalize this things so
+ # that this never happens.
+ for k in range(1, len(running_total)):
+ if running_total[k] == running_total[k - 1]:
+ for j in range(k):
+ running_total[j] *= (1 - min_diff)
+ return running_total
+
+ def start_position(self):
+ return self._start
+
+ def stop_position(self):
+ return self._end
+
+ def try_claim(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix > self._end[0]:
+ return False
+ elif source_ix == self._end[0] and self._end[1] is None:
+ return False
+ else:
+ assert source_ix >= self._claimed_source_ix
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ else:
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+ def try_split(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix < self._claimed_source_ix:
+ # Already claimed.
+ return None
+ elif source_ix > self._end[0]:
+ # After end.
+ return None
+ elif source_ix == self._end[0] and self._end[1] is None:
+ # At/after end.
+ return None
+ else:
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
+ else:
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
+
+ def set_current_position(self, pos):
+ raise NotImplementedError('Should only be called on sub-trackers')
+
+ def position_at_fraction(self, fraction):
+ source_ix, source_frac = self.global_to_local(fraction)
+ last = self._end[0] if self._end[1] is None else self._end[0] + 1
+ if source_ix == last:
+ return (source_ix, None)
+ else:
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
+
+ def fraction_consumed(self):
+ with self._lock:
+ return self.local_to_global(
+ self._claimed_source_ix,
+ self.sub_range_tracker(self._claimed_source_ix)
+ .fraction_consumed())
+
+ def local_to_global(self, source_ix, source_frac):
+ cw = self._cumulative_weights
+ # The global fraction is the fraction to source_ix plus some portion of
+ # the way towards the next source.
+ return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+ def global_to_local(self, frac):
+ if frac == 1:
+ last = self._end[0] if self._end[1] is None else self._end[0] + 1
+ return (last, None)
+ else:
+ cw = self._cumulative_weights
+ # Find the last source that starts at or before frac.
+ source_ix = bisect.bisect(cw, frac) - 1
+ # Return this source, converting what's left of frac after starting
+ # this source into a value in [0.0, 1.0) representing how far we are
+ # towards the next source.
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+ def sub_range_tracker(self, source_ix):
+ assert self._start[0] <= source_ix <= self._end[0]
+ if self._range_trackers[source_ix] is None:
+ with self._lock:
+ if self._range_trackers[source_ix] is None:
+ source = self._source_bundles[source_ix]
+ if source_ix == self._start[0] and self._start[1] is not None:
+ start = self._start[1]
+ else:
+ start = source.start_position
+ if source_ix == self._end[0] and self._end[1] is not None:
+ stop = self._end[1]
+ else:
+ stop = source.stop_position
+ self._range_trackers[source_ix] = source.source.get_range_tracker(
+ start, stop)
+ return self._range_trackers[source_ix]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
new file mode 100644
index 0000000..828bdb0
--- /dev/null
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the sources framework."""
+
+import logging
+import unittest
+
+import apache_beam as beam
+
+from apache_beam.io import iobase
+from apache_beam.io import range_trackers
+from apache_beam.io import source_test_utils
+from apache_beam.io.concat_source import ConcatSource
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+
+
+class RangeSource(iobase.BoundedSource):
+
+ def __init__(self, start, end, split_freq=1):
+ assert start <= end
+ self._start = start
+ self._end = end
+ self._split_freq = split_freq
+
+ def _normalize(self, start_position, end_position):
+ return (self._start if start_position is None else start_position,
+ self._end if end_position is None else end_position)
+
+ def _round_up(self, index):
+ """Rounds up to the nearest mulitple of split_freq."""
+ return index - index % -self._split_freq
+
+ def estimate_size(self):
+ return self._end - self._start
+
+ def split(self, desired_bundle_size, start_position=None, end_position=None):
+ start, end = self._normalize(start_position, end_position)
+ for sub_start in range(start, end, desired_bundle_size):
+ sub_end = min(self._end, sub_start + desired_bundle_size)
+ yield iobase.SourceBundle(
+ sub_end - sub_start,
+ RangeSource(sub_start, sub_end, self._split_freq),
+ None, None)
+
+ def get_range_tracker(self, start_position, end_position):
+ start, end = self._normalize(start_position, end_position)
+ return range_trackers.OffsetRangeTracker(start, end)
+
+ def read(self, range_tracker):
+ for k in range(self._round_up(range_tracker.start_position()),
+ self._round_up(range_tracker.stop_position())):
+ if k % self._split_freq == 0:
+ if not range_tracker.try_claim(k):
+ return
+ yield k
+
+
+class ConcatSourceTest(unittest.TestCase):
+
+ def test_range_source(self):
+ source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+
+ def test_conact_source(self):
+ source = ConcatSource([RangeSource(0, 4),
+ RangeSource(4, 8),
+ RangeSource(8, 12),
+ RangeSource(12, 16),
+ ])
+ self.assertEqual(list(source.read(source.get_range_tracker())),
+ range(16))
+ self.assertEqual(list(source.read(source.get_range_tracker((1, None),
+ (2, 10)))),
+ range(4, 10))
+ range_tracker = source.get_range_tracker(None, None)
+ self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
+ self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
+ self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
+
+ # Simulate a read.
+ self.assertEqual(range_tracker.try_claim((0, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
+ self.assertEqual(range_tracker.fraction_consumed(), 0.125)
+
+ self.assertEqual(range_tracker.try_claim((1, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
+ self.assertEqual(range_tracker.fraction_consumed(), 0.375)
+ self.assertEqual(range_tracker.try_split((0, 1)), None)
+ self.assertEqual(range_tracker.try_split((1, 5)), None)
+
+ self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
+ self.assertEqual(range_tracker.try_claim((3, None)), False)
+ self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
+ self.assertEqual(range_tracker.try_claim((2, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
+
+ self.assertEqual(range_tracker.try_split((2, 8)), None)
+ self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+
+ def test_estimate_size(self):
+ source = ConcatSource([RangeSource(0, 10),
+ RangeSource(10, 100),
+ RangeSource(100, 1000),
+ ])
+ self.assertEqual(source.estimate_size(), 1000)
+
+ def test_position_at_fration(self):
+ ranges = [(0, 4), (4, 16), (16, 24), (24, 32)]
+ source = ConcatSource([iobase.SourceBundle((range[1] - range[0]) / 32.,
+ RangeSource(*range),
+ None, None)
+ for range in ranges])
+
+ range_tracker = source.get_range_tracker()
+ self.assertEquals(range_tracker.position_at_fraction(0), (0, 0))
+ self.assertEquals(range_tracker.position_at_fraction(.01), (0, 1))
+ self.assertEquals(range_tracker.position_at_fraction(.1), (0, 4))
+ self.assertEquals(range_tracker.position_at_fraction(.125), (1, 4))
+ self.assertEquals(range_tracker.position_at_fraction(.2), (1, 7))
+ self.assertEquals(range_tracker.position_at_fraction(.7), (2, 23))
+ self.assertEquals(range_tracker.position_at_fraction(.75), (3, 24))
+ self.assertEquals(range_tracker.position_at_fraction(.8), (3, 26))
+ self.assertEquals(range_tracker.position_at_fraction(1), (4, None))
+
+ range_tracker = source.get_range_tracker((1, None), (3, None))
+ self.assertEquals(range_tracker.position_at_fraction(0), (1, 4))
+ self.assertEquals(range_tracker.position_at_fraction(.01), (1, 5))
+ self.assertEquals(range_tracker.position_at_fraction(.5), (1, 14))
+ self.assertEquals(range_tracker.position_at_fraction(.599), (1, 16))
+ self.assertEquals(range_tracker.position_at_fraction(.601), (2, 17))
+ self.assertEquals(range_tracker.position_at_fraction(1), (3, None))
+
+ def test_empty_source(self):
+ read_all = source_test_utils.readFromSource
+
+ empty = RangeSource(0, 0)
+ self.assertEquals(read_all(ConcatSource([])), [])
+ self.assertEquals(read_all(ConcatSource([empty])), [])
+ self.assertEquals(read_all(ConcatSource([empty, empty])), [])
+
+ range10 = RangeSource(0, 10)
+ self.assertEquals(read_all(ConcatSource([range10]), (0, None), (0, 0)),
+ [])
+ self.assertEquals(read_all(ConcatSource([range10]), (0, 10), (1, None)),
+ [])
+ self.assertEquals(read_all(ConcatSource([range10, range10]),
+ (0, 10), (1, 0)),
+ [])
+
+ def test_single_source(self):
+ read_all = source_test_utils.readFromSource
+
+ range10 = RangeSource(0, 10)
+ self.assertEquals(read_all(ConcatSource([range10])), range(10))
+ self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
+ self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
+ range(5))
+
+ def test_source_with_empty_ranges(self):
+ read_all = source_test_utils.readFromSource
+
+ empty = RangeSource(0, 0)
+ self.assertEquals(read_all(empty), [])
+
+ range10 = RangeSource(0, 10)
+ self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
+ range(10))
+ self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
+ range(10))
+ self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
+ range(10) + range(10))
+
+ def test_source_with_empty_ranges_exhastive(self):
+ empty = RangeSource(0, 0)
+ source = ConcatSource([empty,
+ RangeSource(0, 10),
+ empty,
+ empty,
+ RangeSource(10, 13),
+ RangeSource(13, 17),
+ empty,
+ ])
+ source_test_utils.assertSplitAtFractionExhaustive(source)
+
+ def test_run_concat_direct(self):
+ source = ConcatSource([RangeSource(0, 10),
+ RangeSource(10, 100),
+ RangeSource(100, 1000),
+ ])
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.Read(source)
+ assert_that(pcoll, equal_to(range(1000)))
+
+ pipeline.run()
+
+ def test_conact_source_exhaustive(self):
+ source = ConcatSource([RangeSource(0, 10),
+ RangeSource(100, 110),
+ RangeSource(1000, 1010),
+ ])
+ source_test_utils.assertSplitAtFractionExhaustive(source)
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 2911b9f..8ff69ca 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -28,6 +28,7 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
from multiprocessing.pool import ThreadPool
from apache_beam.internal import pickler
+from apache_beam.io import concat_source
from apache_beam.io import fileio
from apache_beam.io import iobase
from apache_beam.io import range_trackers
@@ -107,7 +108,7 @@ class FileBasedSource(iobase.BoundedSource):
sizes[index],
min_bundle_size=self._min_bundle_size)
single_file_sources.append(single_file_source)
- self._concat_source = iobase.ConcatSource(single_file_sources)
+ self._concat_source = concat_source.ConcatSource(single_file_sources)
return self._concat_source
def open_file(self, file_name):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 3efcc16..d45a6f9 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
# importing following private classes for testing
-from apache_beam.io.iobase import ConcatSource
+from apache_beam.io.concat_source import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index ac06468..4305fb6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,10 +31,8 @@ the sink.
from collections import namedtuple
-import bisect
import logging
import random
-import threading
import uuid
from apache_beam import pvalue
@@ -1099,235 +1097,3 @@ class _RoundRobinKeyFn(core.DoFn):
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, context.element
-
-
-class ConcatSource(BoundedSource):
- """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
- class ConcatRangeTracker(RangeTracker):
-
- def __init__(self, start, end, source_bundles):
- """Initializes ``ConcatRangeTracker``
-
- Args:
- start: start position, a tuple of (source_index, source_position)
- end: end position, a tuple of (source_index, source_position)
- source_bundles: the list of source bundles in the ConcatSource
- """
- super(ConcatSource.ConcatRangeTracker, self).__init__()
- self._start = start
- self._end = end
- self._source_bundles = source_bundles
- self._lock = threading.RLock()
- # Lazily-initialized list of RangeTrackers corresponding to each source.
- self._range_trackers = [None] * len(source_bundles)
- # The currently-being-iterated-over (and latest claimed) source.
- self._claimed_source_ix = self._start[0]
- # Now compute cumulative progress through the sources for converting
- # between global fractions and fractions within specific sources.
- # TODO(robertwb): Implement fraction-at-position to properly scale
- # partial start and end sources.
- # Note, however, that in practice splits are typically on source
- # boundaries anyways.
- last = end[0] if end[1] is None else end[0] + 1
- self._cumulative_weights = (
- [0] * start[0]
- + self._compute_cumulative_weights(source_bundles[start[0]:last])
- + [1] * (len(source_bundles) - last - start[0]))
-
- @staticmethod
- def _compute_cumulative_weights(source_bundles):
- # Two adjacent sources must differ so that they can be uniquely
- # identified by a single global fraction. Let min_diff be the
- # smallest allowable difference between sources.
- min_diff = 1e-5
- # For the computation below, we need weights for all sources.
- # Substitute average weights for those whose weights are
- # unspecified (or 1.0 for everything if none are known).
- known = [s.weight for s in source_bundles if s.weight is not None]
- avg = sum(known) / len(known) if known else 1.0
- weights = [s.weight or avg for s in source_bundles]
-
- # Now compute running totals of the percent done upon reaching
- # each source, with respect to the start and end positions.
- # E.g. if the weights were [100, 20, 3] we would produce
- # [0.0, 100/123, 120/123, 1.0]
- total = float(sum(weights))
- running_total = [0]
- for w in weights:
- running_total.append(
- max(min_diff, min(1, running_total[-1] + w / total)))
- running_total[-1] = 1 # In case of rounding error.
- # There are issues if, due to rouding error or greatly differing sizes,
- # two adjacent running total weights are equal. Normalize this things so
- # that this never happens.
- for k in range(1, len(running_total)):
- if running_total[k] == running_total[k - 1]:
- for j in range(k):
- running_total[j] *= (1 - min_diff)
- return running_total
-
- def start_position(self):
- return self._start
-
- def stop_position(self):
- return self._end
-
- def try_claim(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix > self._end[0]:
- return False
- elif source_ix == self._end[0] and self._end[1] is None:
- return False
- else:
- assert source_ix >= self._claimed_source_ix
- self._claimed_source_ix = source_ix
- if source_pos is None:
- return True
- else:
- return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
- def try_split(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix < self._claimed_source_ix:
- # Already claimed.
- return None
- elif source_ix > self._end[0]:
- # After end.
- return None
- elif source_ix == self._end[0] and self._end[1] is None:
- # At/after end.
- return None
- else:
- if source_ix > self._claimed_source_ix:
- # Prefer to split on even boundary.
- split_pos = None
- ratio = self._cumulative_weights[source_ix]
- else:
- # Split the current subsource.
- split = self.sub_range_tracker(source_ix).try_split(
- source_pos)
- if not split:
- return None
- split_pos, frac = split
- ratio = self.local_to_global(source_ix, frac)
-
- self._end = source_ix, split_pos
- self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
- return (source_ix, split_pos), ratio
-
- def set_current_position(self, pos):
- raise NotImplementedError('Should only be called on sub-trackers')
-
- def position_at_fraction(self, fraction):
- source_ix, source_frac = self.global_to_local(fraction)
- if source_ix == len(self._source_bundles):
- return (source_ix, None)
- else:
- return (source_ix,
- self.sub_range_tracker(source_ix).position_at_fraction(
- source_frac))
-
- def fraction_consumed(self):
- with self._lock:
- return self.local_to_global(
- self._claimed_source_ix,
- self.sub_range_tracker(self._claimed_source_ix)
- .fraction_consumed())
-
- def local_to_global(self, source_ix, source_frac):
- cw = self._cumulative_weights
- # The global fraction is the fraction to source_ix plus some portion of
- # the way towards the next source.
- return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
- def global_to_local(self, frac):
- if frac == 1:
- return (len(self._source_bundles), 0)
- else:
- cw = self._cumulative_weights
- # Find the last source that starts at or before frac.
- source_ix = bisect.bisect(cw, frac) - 1
- # Return this source, converting what's left of frac after starting
- # this source into a value in [0.0, 1.0) representing how far we are
- # towards the next source.
- return (source_ix,
- (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
- def sub_range_tracker(self, source_ix):
- assert self._start[0] <= source_ix <= self._end[0]
- if self._range_trackers[source_ix] is None:
- with self._lock:
- if self._range_trackers[source_ix] is None:
- source = self._source_bundles[source_ix]
- if source_ix == self._start[0] and self._start[1] is not None:
- start = self._start[1]
- else:
- start = source.start_position
- if source_ix == self._end[0] and self._end[1] is not None:
- stop = self._end[1]
- else:
- stop = source.stop_position
- self._range_trackers[source_ix] = source.source.get_range_tracker(
- start, stop)
- return self._range_trackers[source_ix]
-
- def __init__(self, sources):
- self._source_bundles = [source if isinstance(source, SourceBundle)
- else SourceBundle(None, source, None, None)
- for source in sources]
-
- @property
- def sources(self):
- return [s.source for s in self._source_bundles]
-
- def estimate_size(self):
- return sum(s.source.estimate_size() for s in self._source_bundles)
-
- def split(
- self, desired_bundle_size=None, start_position=None, stop_position=None):
- if start_position or stop_position:
- raise ValueError(
- 'Multi-level initial splitting is not supported. Expected start and '
- 'stop positions to be None. Received %r and %r respectively.' %
- (start_position, stop_position))
-
- for source in self._source_bundles:
- # We assume all sub-sources to produce bundles that specify weight using
- # the same unit. For example, all sub-sources may specify the size in
- # bytes as their weight.
- for bundle in source.source.split(
- desired_bundle_size, source.start_position, source.stop_position):
- yield bundle
-
- def get_range_tracker(self, start_position=None, stop_position=None):
- if start_position is None:
- start_position = (0, None)
- if stop_position is None:
- stop_position = (len(self._source_bundles), None)
- return self.ConcatRangeTracker(
- start_position, stop_position, self._source_bundles)
-
- def read(self, range_tracker):
- start_source, _ = range_tracker.start_position()
- stop_source, stop_pos = range_tracker.stop_position()
- if stop_pos is not None:
- stop_source += 1
- for source_ix in range(start_source, stop_source):
- if not range_tracker.try_claim((source_ix, None)):
- break
- for record in self._source_bundles[source_ix].source.read(
- range_tracker.sub_range_tracker(source_ix)):
- yield record
-
- def default_output_coder(self):
- if self._source_bundles:
- # Getting coder from the first sub-sources. This assumes all sub-sources
- # to produce the same coder.
- return self._source_bundles[0].source.default_output_coder()
- else:
- # Defaulting to PickleCoder.
- return super(ConcatSource, self).default_output_coder()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index f52f16b..13b1e91 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -66,7 +66,7 @@ SplitFractionStatistics = namedtuple(
'successful_fractions non_trivial_fractions')
-def readFromSource(source, start_position, stop_position):
+def readFromSource(source, start_position=None, stop_position=None):
"""Reads elements from the given ```BoundedSource```.
Only reads elements within the given position range.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d189f83e/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index e6f369e..35a13f4 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -27,7 +27,6 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
-from apache_beam.io import source_test_utils
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -81,47 +80,6 @@ class LineSource(iobase.BoundedSource):
return coders.BytesCoder()
-class RangeSource(iobase.BoundedSource):
-
- def __init__(self, start, end, split_freq=1):
- assert start <= end
- self._start = start
- self._end = end
- self._split_freq = split_freq
-
- def _normalize(self, start_position, end_position):
- return (self._start if start_position is None else start_position,
- self._end if end_position is None else end_position)
-
- def _round_up(self, index):
- """Rounds up to the nearest mulitple of split_freq."""
- return index - index % -self._split_freq
-
- def estimate_size(self):
- return self._stop - self._start
-
- def split(self, desired_bundle_size, start_position=None, end_position=None):
- start, end = self._normalize(start_position, end_position)
- for sub_start in range(start, end, desired_bundle_size):
- sub_end = min(self._end, sub_start + desired_bundle_size)
- yield iobase.SourceBundle(
- sub_end - sub_start,
- RangeSource(sub_start, sub_end, self._split_freq),
- None, None)
-
- def get_range_tracker(self, start_position, end_position):
- start, end = self._normalize(start_position, end_position)
- return range_trackers.OffsetRangeTracker(start, end)
-
- def read(self, range_tracker):
- for k in range(self._round_up(range_tracker.start_position()),
- self._round_up(range_tracker.stop_position())):
- if k % self._split_freq == 0:
- if not range_tracker.try_claim(k):
- return
- yield k
-
-
class SourcesTest(unittest.TestCase):
def _create_temp_file(self, contents):
@@ -146,64 +104,6 @@ class SourcesTest(unittest.TestCase):
pipeline.run()
- def test_range_source(self):
- source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
-
- def test_conact_source(self):
- source = iobase.ConcatSource([RangeSource(0, 4),
- RangeSource(4, 8),
- RangeSource(8, 12),
- RangeSource(12, 16),
- ])
- self.assertEqual(list(source.read(source.get_range_tracker())),
- range(16))
- self.assertEqual(list(source.read(source.get_range_tracker((1, None),
- (2, 10)))),
- range(4, 10))
- range_tracker = source.get_range_tracker(None, None)
- self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
- self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
- self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
-
- # Simulate a read.
- self.assertEqual(range_tracker.try_claim((0, None)), True)
- self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
- self.assertEqual(range_tracker.fraction_consumed(), 0.125)
-
- self.assertEqual(range_tracker.try_claim((1, None)), True)
- self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
- self.assertEqual(range_tracker.fraction_consumed(), 0.375)
- self.assertEqual(range_tracker.try_split((0, 1)), None)
- self.assertEqual(range_tracker.try_split((1, 5)), None)
-
- self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
- self.assertEqual(range_tracker.try_claim((3, None)), False)
- self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
- self.assertEqual(range_tracker.try_claim((2, None)), True)
- self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
-
- self.assertEqual(range_tracker.try_split((2, 8)), None)
- self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
- self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
- self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
-
- def test_run_concat_direct(self):
- source = iobase.ConcatSource([RangeSource(0, 10),
- RangeSource(10, 100),
- RangeSource(100, 1000),
- ])
- pipeline = beam.Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Read(source)
- assert_that(pcoll, equal_to(range(1000)))
-
- pipeline.run()
-
- def test_conact_source_exhaustive(self):
- source = iobase.ConcatSource([RangeSource(0, 10),
- RangeSource(100, 110),
- RangeSource(1000, 1010),
- ])
- source_test_utils.assertSplitAtFractionExhaustive(source)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
[2/6] incubator-beam git commit: Allow ConcatSource to take
SourceBundles rather than raw Sources
Posted by ro...@apache.org.
Allow ConcatSource to take SourceBundles rather than raw Sources
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ffe7ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ffe7ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ffe7ccf
Branch: refs/heads/python-sdk
Commit: 0ffe7ccffd46c2e0505d723cca4305bbc7fb8940
Parents: d65d17a
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 17:21:20 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 136 ++++++++++++++++--------
sdks/python/apache_beam/io/sources_test.py | 21 +++-
2 files changed, 110 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ffe7ccf/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 1205a26..ac06468 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1106,34 +1106,66 @@ class ConcatSource(BoundedSource):
class ConcatRangeTracker(RangeTracker):
- def __init__(self, start, end, sources, weights=None):
+ def __init__(self, start, end, source_bundles):
+ """Initializes ``ConcatRangeTracker``
+
+ Args:
+ start: start position, a tuple of (source_index, source_position)
+ end: end position, a tuple of (source_index, source_position)
+ source_bundles: the list of source bundles in the ConcatSource
+ """
super(ConcatSource.ConcatRangeTracker, self).__init__()
self._start = start
self._end = end
+ self._source_bundles = source_bundles
self._lock = threading.RLock()
- self._sources = sources
- self._range_trackers = [None] * len(self._sources)
+ # Lazily-initialized list of RangeTrackers corresponding to each source.
+ self._range_trackers = [None] * len(source_bundles)
+ # The currently-being-iterated-over (and latest claimed) source.
self._claimed_source_ix = self._start[0]
-
- if weights is None:
- n = max(1, end[0] - start[0])
- self._cumulative_weights = [
- max(0, min(1, float(k) / n))
- for k in range(-start[0], len(self._sources) - start[0] + 1)]
- else:
- assert len(sources) == len(weights)
- relevant_weights = weights[start[0]:end[0] + 1]
- # TODO(robertwb): Implement fraction-at-position to properly scale
- # partial start and end sources.
- total = sum(relevant_weights)
- running_total = [0]
- for w in relevant_weights:
- running_total.append(max(1, running_total[-1] + w / total))
- running_total[-1] = 1 # In case of rounding error.
- self._cumulative_weights = (
+ # Now compute cumulative progress through the sources for converting
+ # between global fractions and fractions within specific sources.
+ # TODO(robertwb): Implement fraction-at-position to properly scale
+ # partial start and end sources.
+ # Note, however, that in practice splits are typically on source
+ # boundaries anyways.
+ last = end[0] if end[1] is None else end[0] + 1
+ self._cumulative_weights = (
[0] * start[0]
- + running_total
- + [1] * (len(self._sources) - end[0]))
+ + self._compute_cumulative_weights(source_bundles[start[0]:last])
+ + [1] * (len(source_bundles) - last - start[0]))
+
+ @staticmethod
+ def _compute_cumulative_weights(source_bundles):
+ # Two adjacent sources must differ so that they can be uniquely
+ # identified by a single global fraction. Let min_diff be the
+ # smallest allowable difference between sources.
+ min_diff = 1e-5
+ # For the computation below, we need weights for all sources.
+ # Substitute average weights for those whose weights are
+ # unspecified (or 1.0 for everything if none are known).
+ known = [s.weight for s in source_bundles if s.weight is not None]
+ avg = sum(known) / len(known) if known else 1.0
+ weights = [s.weight or avg for s in source_bundles]
+
+ # Now compute running totals of the percent done upon reaching
+ # each source, with respect to the start and end positions.
+ # E.g. if the weights were [100, 20, 3] we would produce
+ # [0.0, 100/123, 120/123, 1.0]
+ total = float(sum(weights))
+ running_total = [0]
+ for w in weights:
+ running_total.append(
+ max(min_diff, min(1, running_total[-1] + w / total)))
+ running_total[-1] = 1 # In case of rounding error.
+ # There are issues if, due to rouding error or greatly differing sizes,
+ # two adjacent running total weights are equal. Normalize this things so
+ # that this never happens.
+ for k in range(1, len(running_total)):
+ if running_total[k] == running_total[k - 1]:
+ for j in range(k):
+ running_total[j] *= (1 - min_diff)
+ return running_total
def start_position(self):
return self._start
@@ -1149,6 +1181,7 @@ class ConcatSource(BoundedSource):
elif source_ix == self._end[0] and self._end[1] is None:
return False
else:
+ assert source_ix >= self._claimed_source_ix
self._claimed_source_ix = source_ix
if source_pos is None:
return True
@@ -1183,7 +1216,7 @@ class ConcatSource(BoundedSource):
self._end = source_ix, split_pos
self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
+ for w in self._cumulative_weights]
return (source_ix, split_pos), ratio
def set_current_position(self, pos):
@@ -1191,7 +1224,7 @@ class ConcatSource(BoundedSource):
def position_at_fraction(self, fraction):
source_ix, source_frac = self.global_to_local(fraction)
- if source_ix == len(self._sources):
+ if source_ix == len(self._source_bundles):
return (source_ix, None)
else:
return (source_ix,
@@ -1200,21 +1233,27 @@ class ConcatSource(BoundedSource):
def fraction_consumed(self):
with self._lock:
- return self.local_to_global(self._claimed_source_ix,
- self.sub_range_tracker(
- self._claimed_source_ix)
- .fraction_consumed())
+ return self.local_to_global(
+ self._claimed_source_ix,
+ self.sub_range_tracker(self._claimed_source_ix)
+ .fraction_consumed())
def local_to_global(self, source_ix, source_frac):
cw = self._cumulative_weights
+ # The global fraction is the fraction to source_ix plus some portion of
+ # the way towards the next source.
return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
def global_to_local(self, frac):
if frac == 1:
- return (len(self._sources), 0)
+ return (len(self._source_bundles), 0)
else:
cw = self._cumulative_weights
+ # Find the last source that starts at or before frac.
source_ix = bisect.bisect(cw, frac) - 1
+ # Return this source, converting what's left of frac after starting
+ # this source into a value in [0.0, 1.0) representing how far we are
+ # towards the next source.
return (source_ix,
(frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
@@ -1223,21 +1262,30 @@ class ConcatSource(BoundedSource):
if self._range_trackers[source_ix] is None:
with self._lock:
if self._range_trackers[source_ix] is None:
- self._range_trackers[source_ix] = (
- self._sources[source_ix].get_range_tracker(
- self._start[1] if source_ix == self._start[0] else None,
- self._end[1] if source_ix == self._end[0] else None))
+ source = self._source_bundles[source_ix]
+ if source_ix == self._start[0] and self._start[1] is not None:
+ start = self._start[1]
+ else:
+ start = source.start_position
+ if source_ix == self._end[0] and self._end[1] is not None:
+ stop = self._end[1]
+ else:
+ stop = source.stop_position
+ self._range_trackers[source_ix] = source.source.get_range_tracker(
+ start, stop)
return self._range_trackers[source_ix]
def __init__(self, sources):
- self._sources = sources
+ self._source_bundles = [source if isinstance(source, SourceBundle)
+ else SourceBundle(None, source, None, None)
+ for source in sources]
@property
def sources(self):
- return self._sources
+ return [s.source for s in self._source_bundles]
def estimate_size(self):
- return sum(s.estimate_size() for s in self._sources)
+ return sum(s.source.estimate_size() for s in self._source_bundles)
def split(
self, desired_bundle_size=None, start_position=None, stop_position=None):
@@ -1247,19 +1295,21 @@ class ConcatSource(BoundedSource):
'stop positions to be None. Received %r and %r respectively.' %
(start_position, stop_position))
- for source in self._sources:
+ for source in self._source_bundles:
# We assume all sub-sources to produce bundles that specify weight using
# the same unit. For example, all sub-sources may specify the size in
# bytes as their weight.
- for bundle in source.split(desired_bundle_size, None, None):
+ for bundle in source.source.split(
+ desired_bundle_size, source.start_position, source.stop_position):
yield bundle
def get_range_tracker(self, start_position=None, stop_position=None):
if start_position is None:
start_position = (0, None)
if stop_position is None:
- stop_position = (len(self._sources), None)
- return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+ stop_position = (len(self._source_bundles), None)
+ return self.ConcatRangeTracker(
+ start_position, stop_position, self._source_bundles)
def read(self, range_tracker):
start_source, _ = range_tracker.start_position()
@@ -1269,15 +1319,15 @@ class ConcatSource(BoundedSource):
for source_ix in range(start_source, stop_source):
if not range_tracker.try_claim((source_ix, None)):
break
- for record in self._sources[source_ix].read(
+ for record in self._source_bundles[source_ix].source.read(
range_tracker.sub_range_tracker(source_ix)):
yield record
def default_output_coder(self):
- if self._sources:
+ if self._source_bundles:
# Getting coder from the first sub-sources. This assumes all sub-sources
# to produce the same coder.
- return self._sources[0].default_output_coder()
+ return self._source_bundles[0].source.default_output_coder()
else:
# Defaulting to PickleCoder.
return super(ConcatSource, self).default_output_coder()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ffe7ccf/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 4d16560..e6f369e 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -104,8 +104,10 @@ class RangeSource(iobase.BoundedSource):
start, end = self._normalize(start_position, end_position)
for sub_start in range(start, end, desired_bundle_size):
sub_end = min(self._end, sub_start + desired_bundle_size)
- yield SourceBundle(RangeSource(sub_start, sub_end, self._split_freq),
- sub_end - sub_start)
+ yield iobase.SourceBundle(
+ sub_end - sub_start,
+ RangeSource(sub_start, sub_end, self._split_freq),
+ None, None)
def get_range_tracker(self, start_position, end_position):
start, end = self._normalize(start_position, end_position)
@@ -152,7 +154,7 @@ class SourcesTest(unittest.TestCase):
RangeSource(4, 8),
RangeSource(8, 12),
RangeSource(12, 16),
- ])
+ ])
self.assertEqual(list(source.read(source.get_range_tracker())),
range(16))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -185,11 +187,22 @@ class SourcesTest(unittest.TestCase):
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+ def test_run_concat_direct(self):
+ source = iobase.ConcatSource([RangeSource(0, 10),
+ RangeSource(10, 100),
+ RangeSource(100, 1000),
+ ])
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.Read(source)
+ assert_that(pcoll, equal_to(range(1000)))
+
+ pipeline.run()
+
def test_conact_source_exhaustive(self):
source = iobase.ConcatSource([RangeSource(0, 10),
RangeSource(100, 110),
RangeSource(1000, 1010),
- ])
+ ])
source_test_utils.assertSplitAtFractionExhaustive(source)
if __name__ == '__main__':
[6/6] incubator-beam git commit: Closes #955
Posted by ro...@apache.org.
Closes #955
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/753cc9c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/753cc9c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/753cc9c2
Branch: refs/heads/python-sdk
Commit: 753cc9c2e432beb4175902b28b3f9f788bc87f14
Parents: 0fa9c4b d189f83
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Sep 23 13:50:02 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:50:02 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 264 +++++++++++++++++++
.../python/apache_beam/io/concat_source_test.py | 222 ++++++++++++++++
sdks/python/apache_beam/io/filebasedsource.py | 81 +-----
.../apache_beam/io/filebasedsource_test.py | 12 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/io/range_trackers.py | 2 +-
sdks/python/apache_beam/io/source_test_utils.py | 27 +-
sdks/python/apache_beam/io/sources_test.py | 1 +
8 files changed, 521 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
[3/6] incubator-beam git commit: Minor cleanups in docstrings and
error messages.
Posted by ro...@apache.org.
Minor cleanups in docstrings and error messages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1f42e62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1f42e62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1f42e62
Branch: refs/heads/python-sdk
Commit: c1f42e6203de8d0576dbbf324c22edf6aac0ca64
Parents: 0fa9c4b
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:25:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 25 +++++++++-----------
.../apache_beam/io/filebasedsource_test.py | 10 ++++----
sdks/python/apache_beam/io/iobase.py | 4 ++--
sdks/python/apache_beam/io/range_trackers.py | 2 +-
sdks/python/apache_beam/io/source_test_utils.py | 25 ++++++++++----------
5 files changed, 31 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 14de140..5204f04 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -54,8 +54,8 @@ class _ConcatSource(iobase.BoundedSource):
if start_position or stop_position:
raise ValueError(
'Multi-level initial splitting is not supported. Expected start and '
- 'stop positions to be None. Received %r and %r respectively.',
- start_position, stop_position)
+ 'stop positions to be None. Received %r and %r respectively.' %
+ (start_position, stop_position))
for source in self._sources:
# We assume all sub-sources to produce bundles that specify weight using
@@ -224,25 +224,22 @@ class FileBasedSource(iobase.BoundedSource):
class _SingleFileSource(iobase.BoundedSource):
- """Denotes a source for a specific file type.
-
- This should be sub-classed to add support for reading a new file type.
- """
+ """Denotes a source for a specific file type."""
def __init__(self, file_based_source, file_name, start_offset, stop_offset,
min_bundle_size=0):
- if not (isinstance(start_offset, int) or isinstance(start_offset, long)):
- raise ValueError(
- 'start_offset must be a number. Received: %r', start_offset)
+ if not isinstance(start_offset, (int, long)):
+ raise TypeError(
+ 'start_offset must be a number. Received: %r' % start_offset)
if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
- if not (isinstance(stop_offset, int) or isinstance(stop_offset, long)):
- raise ValueError(
- 'stop_offset must be a number. Received: %r', stop_offset)
+ if not isinstance(stop_offset, (int, long)):
+ raise TypeError(
+ 'stop_offset must be a number. Received: %r' % stop_offset)
if start_offset >= stop_offset:
raise ValueError(
'start_offset must be smaller than stop_offset. Received %d and %d '
- 'for start and stop offsets respectively',
- start_offset, stop_offset)
+ 'for start and stop offsets respectively' %
+ (start_offset, stop_offset))
self._file_name = file_name
self._is_gcs_file = file_name.startswith('gs://') if file_name else False
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c4ad026..50f0a22 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -384,19 +384,19 @@ class TestSingleFileSource(unittest.TestCase):
fbs = LineSource('dymmy_pattern')
- with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+ with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
SingleFileSource(
fbs, file_name='dummy_file', start_offset='aaa', stop_offset='bbb')
- with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+ with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
SingleFileSource(
fbs, file_name='dummy_file', start_offset='aaa', stop_offset=100)
- with self.assertRaisesRegexp(ValueError, stop_not_a_number_error):
+ with self.assertRaisesRegexp(TypeError, stop_not_a_number_error):
SingleFileSource(
fbs, file_name='dummy_file', start_offset=100, stop_offset='bbb')
- with self.assertRaisesRegexp(ValueError, stop_not_a_number_error):
+ with self.assertRaisesRegexp(TypeError, stop_not_a_number_error):
SingleFileSource(
fbs, file_name='dummy_file', start_offset=100, stop_offset=None)
- with self.assertRaisesRegexp(ValueError, start_not_a_number_error):
+ with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
SingleFileSource(
fbs, file_name='dummy_file', start_offset=None, stop_offset=100)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e1f364b..4305fb6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -545,11 +545,11 @@ class RangeTracker(object):
def start_position(self):
"""Returns the starting position of the current range, inclusive."""
- raise NotImplementedError
+ raise NotImplementedError(type(self))
def stop_position(self):
"""Returns the ending position of the current range, exclusive."""
- raise NotImplementedError
+ raise NotImplementedError(type(self))
def try_claim(self, position): # pylint: disable=unused-argument
"""Atomically determines if a record at a split point is within the range.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index af6f6c8..080e2f3 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -138,10 +138,10 @@ class OffsetRangeTracker(iobase.RangeTracker):
return
logging.debug('Agreeing to split %r at %d', self, split_offset)
- self._stop_offset = split_offset
split_fraction = (float(split_offset - self._start_offset) / (
self._stop_offset - self._start_offset))
+ self._stop_offset = split_offset
return self._stop_offset, split_fraction
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1f42e62/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 4e3a3e3..f52f16b 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -201,43 +201,42 @@ def _assertSplitAtFractionBehavior(
if split_result is not None:
if len(split_result) != 2:
raise ValueError('Split result must be a tuple that contains split '
- 'position and split fraction. Received: %r',
- split_result)
+ 'position and split fraction. Received: %r' %
+ (split_result,))
if range_tracker.stop_position() != split_result[0]:
raise ValueError('After a successful split, the stop position of the '
'RangeTracker must be the same as the returned split '
'position. Observed %r and %r which are different.',
- range_tracker.stop_position(), split_result[0])
+ range_tracker.stop_position() % (split_result[0],))
if split_fraction < 0 or split_fraction > 1:
raise ValueError('Split fraction must be within the range [0,1]',
- 'Observed split fraction was %r.', split_result[1])
+ 'Observed split fraction was %r.' % (split_result[1],))
stop_position_after_split = range_tracker.stop_position()
if split_result and stop_position_after_split == stop_position_before_split:
raise ValueError('Stop position %r did not change after a successful '
- 'split of source %r at fraction %r.',
- stop_position_before_split, source, split_fraction)
+ 'split of source %r at fraction %r.' %
+ (stop_position_before_split, source, split_fraction))
if expected_outcome == ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT:
if not split_result:
raise ValueError('Expected split of source %r at fraction %r to be '
'successful after reading %d elements. But '
- 'the split failed.',
- source, split_fraction,
- num_items_to_read_before_split)
+ 'the split failed.' %
+ (source, split_fraction, num_items_to_read_before_split))
elif expected_outcome == ExpectedSplitOutcome.MUST_FAIL:
if split_result:
raise ValueError('Expected split of source %r at fraction %r after '
'reading %d elements to fail. But splitting '
- 'succeeded with result %r.',
- source, split_fraction,
- num_items_to_read_before_split, split_result)
+ 'succeeded with result %r.' %
+ (source, split_fraction, num_items_to_read_before_split,
+ split_result))
elif (expected_outcome !=
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS):
- raise ValueError('Unknown type of expected outcome: %r',
+ raise ValueError('Unknown type of expected outcome: %r'%
expected_outcome)
current_items.extend([value for value in reader_iter])
[5/6] incubator-beam git commit: Move ConcatSource to iobase.
Posted by ro...@apache.org.
Move ConcatSource to iobase.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d65d17aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d65d17aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d65d17aa
Branch: refs/heads/python-sdk
Commit: d65d17aa1e5ecbeed20cbcf0edbe11280acf0262
Parents: c7d5a37
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:49:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 189 +------------------
.../apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/iobase.py | 184 ++++++++++++++++++
sdks/python/apache_beam/io/sources_test.py | 19 +-
4 files changed, 195 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index dd91ef2..2911b9f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,9 +25,7 @@ for more details.
For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
-import bisect
from multiprocessing.pool import ThreadPool
-import threading
from apache_beam.internal import pickler
from apache_beam.io import fileio
@@ -37,191 +35,6 @@ from apache_beam.io import range_trackers
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
-class ConcatSource(iobase.BoundedSource):
- """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
-
- class ConcatRangeTracker(iobase.RangeTracker):
-
- def __init__(self, start, end, sources, weights=None):
- super(ConcatSource.ConcatRangeTracker, self).__init__()
- self._start = start
- self._end = end
- self._lock = threading.RLock()
- self._sources = sources
- self._range_trackers = [None] * len(self._sources)
- self._claimed_source_ix = self._start[0]
-
- if weights is None:
- n = max(1, end[0] - start[0])
- self._cumulative_weights = [
- max(0, min(1, float(k) / n))
- for k in range(-start[0], len(self._sources) - start[0] + 1)]
- else:
- assert len(sources) == len(weights)
- relevant_weights = weights[start[0]:end[0] + 1]
- # TODO(robertwb): Implement fraction-at-position to properly scale
- # partial start and end sources.
- total = sum(relevant_weights)
- running_total = [0]
- for w in relevant_weights:
- running_total.append(max(1, running_total[-1] + w / total))
- running_total[-1] = 1 # In case of rounding error.
- self._cumulative_weights = (
- [0] * start[0]
- + running_total
- + [1] * (len(self._sources) - end[0]))
-
- def start_position(self):
- return self._start
-
- def stop_position(self):
- return self._end
-
- def try_claim(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix > self._end[0]:
- return False
- elif source_ix == self._end[0] and self._end[1] is None:
- return False
- else:
- self._claimed_source_ix = source_ix
- if source_pos is None:
- return True
- else:
- return self.sub_range_tracker(source_ix).try_claim(source_pos)
-
- def try_split(self, pos):
- source_ix, source_pos = pos
- with self._lock:
- if source_ix < self._claimed_source_ix:
- # Already claimed.
- return None
- elif source_ix > self._end[0]:
- # After end.
- return None
- elif source_ix == self._end[0] and self._end[1] is None:
- # At/after end.
- return None
- else:
- if source_ix > self._claimed_source_ix:
- # Prefer to split on even boundary.
- split_pos = None
- ratio = self._cumulative_weights[source_ix]
- else:
- # Split the current subsource.
- split = self.sub_range_tracker(source_ix).try_split(
- source_pos)
- if not split:
- return None
- split_pos, frac = split
- ratio = self.local_to_global(source_ix, frac)
-
- self._end = source_ix, split_pos
- self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
- return (source_ix, split_pos), ratio
-
- def set_current_position(self, pos):
- raise NotImplementedError('Should only be called on sub-trackers')
-
- def position_at_fraction(self, fraction):
- source_ix, source_frac = self.global_to_local(fraction)
- if source_ix == len(self._sources):
- return (source_ix, None)
- else:
- return (source_ix,
- self.sub_range_tracker(source_ix).position_at_fraction(
- source_frac))
-
- def fraction_consumed(self):
- with self._lock:
- return self.local_to_global(self._claimed_source_ix,
- self.sub_range_tracker(
- self._claimed_source_ix)
- .fraction_consumed())
-
- def local_to_global(self, source_ix, source_frac):
- cw = self._cumulative_weights
- return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
-
- def global_to_local(self, frac):
- if frac == 1:
- return (len(self._sources), 0)
- else:
- cw = self._cumulative_weights
- source_ix = bisect.bisect(cw, frac) - 1
- return (source_ix,
- (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
-
- def sub_range_tracker(self, source_ix):
- assert self._start[0] <= source_ix <= self._end[0]
- if self._range_trackers[source_ix] is None:
- with self._lock:
- if self._range_trackers[source_ix] is None:
- self._range_trackers[source_ix] = (
- self._sources[source_ix].get_range_tracker(
- self._start[1] if source_ix == self._start[0] else None,
- self._end[1] if source_ix == self._end[0] else None))
- return self._range_trackers[source_ix]
-
- def __init__(self, sources):
- self._sources = sources
-
- @property
- def sources(self):
- return self._sources
-
- def estimate_size(self):
- return sum(s.estimate_size() for s in self._sources)
-
- def split(
- self, desired_bundle_size=None, start_position=None, stop_position=None):
- if start_position or stop_position:
- raise ValueError(
- 'Multi-level initial splitting is not supported. Expected start and '
- 'stop positions to be None. Received %r and %r respectively.' %
- (start_position, stop_position))
-
- for source in self._sources:
- # We assume all sub-sources to produce bundles that specify weight using
- # the same unit. For example, all sub-sources may specify the size in
- # bytes as their weight.
- for bundle in source.split(desired_bundle_size, None, None):
- yield bundle
-
- def get_range_tracker(self, start_position=None, stop_position=None):
- if start_position is None:
- start_position = (0, None)
- if stop_position is None:
- stop_position = (len(self._sources), None)
- return self.ConcatRangeTracker(start_position, stop_position, self._sources)
-
- def read(self, range_tracker):
- start_source, _ = range_tracker.start_position()
- stop_source, stop_pos = range_tracker.stop_position()
- if stop_pos is not None:
- stop_source += 1
- for source_ix in range(start_source, stop_source):
- if not range_tracker.try_claim((source_ix, None)):
- break
- for record in self._sources[source_ix].read(
- range_tracker.sub_range_tracker(source_ix)):
- yield record
-
- def default_output_coder(self):
- if self._sources:
- # Getting coder from the first sub-sources. This assumes all sub-sources
- # to produce the same coder.
- return self._sources[0].default_output_coder()
- else:
- # Defaulting to PickleCoder.
- return super(ConcatSource, self).default_output_coder()
-
-
-_ConcatSource = ConcatSource
-
-
class FileBasedSource(iobase.BoundedSource):
"""A ``BoundedSource`` for reading a file glob of a given type."""
@@ -294,7 +107,7 @@ class FileBasedSource(iobase.BoundedSource):
sizes[index],
min_bundle_size=self._min_bundle_size)
single_file_sources.append(single_file_source)
- self._concat_source = ConcatSource(single_file_sources)
+ self._concat_source = iobase.ConcatSource(single_file_sources)
return self._concat_source
def open_file(self, file_name):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 50f0a22..3efcc16 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -30,7 +30,7 @@ from apache_beam.io import iobase
from apache_beam.io import range_trackers
# importing following private classes for testing
-from apache_beam.io.filebasedsource import _ConcatSource as ConcatSource
+from apache_beam.io.iobase import ConcatSource
from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource
from apache_beam.io.filebasedsource import FileBasedSource
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 4305fb6..1205a26 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -31,8 +31,10 @@ the sink.
from collections import namedtuple
+import bisect
import logging
import random
+import threading
import uuid
from apache_beam import pvalue
@@ -1097,3 +1099,185 @@ class _RoundRobinKeyFn(core.DoFn):
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, context.element
+
+
+class ConcatSource(BoundedSource):
+ """A ``BoundedSource`` that can group a set of ``BoundedSources``."""
+
+ class ConcatRangeTracker(RangeTracker):
+
+ def __init__(self, start, end, sources, weights=None):
+ super(ConcatSource.ConcatRangeTracker, self).__init__()
+ self._start = start
+ self._end = end
+ self._lock = threading.RLock()
+ self._sources = sources
+ self._range_trackers = [None] * len(self._sources)
+ self._claimed_source_ix = self._start[0]
+
+ if weights is None:
+ n = max(1, end[0] - start[0])
+ self._cumulative_weights = [
+ max(0, min(1, float(k) / n))
+ for k in range(-start[0], len(self._sources) - start[0] + 1)]
+ else:
+ assert len(sources) == len(weights)
+ relevant_weights = weights[start[0]:end[0] + 1]
+ # TODO(robertwb): Implement fraction-at-position to properly scale
+ # partial start and end sources.
+ total = sum(relevant_weights)
+ running_total = [0]
+ for w in relevant_weights:
+ running_total.append(max(1, running_total[-1] + w / total))
+ running_total[-1] = 1 # In case of rounding error.
+ self._cumulative_weights = (
+ [0] * start[0]
+ + running_total
+ + [1] * (len(self._sources) - end[0]))
+
+ def start_position(self):
+ return self._start
+
+ def stop_position(self):
+ return self._end
+
+ def try_claim(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix > self._end[0]:
+ return False
+ elif source_ix == self._end[0] and self._end[1] is None:
+ return False
+ else:
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ else:
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+ def try_split(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix < self._claimed_source_ix:
+ # Already claimed.
+ return None
+ elif source_ix > self._end[0]:
+ # After end.
+ return None
+ elif source_ix == self._end[0] and self._end[1] is None:
+ # At/after end.
+ return None
+ else:
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
+ else:
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
+
+ def set_current_position(self, pos):
+ raise NotImplementedError('Should only be called on sub-trackers')
+
+ def position_at_fraction(self, fraction):
+ source_ix, source_frac = self.global_to_local(fraction)
+ if source_ix == len(self._sources):
+ return (source_ix, None)
+ else:
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
+
+ def fraction_consumed(self):
+ with self._lock:
+ return self.local_to_global(self._claimed_source_ix,
+ self.sub_range_tracker(
+ self._claimed_source_ix)
+ .fraction_consumed())
+
+ def local_to_global(self, source_ix, source_frac):
+ cw = self._cumulative_weights
+ return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+ def global_to_local(self, frac):
+ if frac == 1:
+ return (len(self._sources), 0)
+ else:
+ cw = self._cumulative_weights
+ source_ix = bisect.bisect(cw, frac) - 1
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+ def sub_range_tracker(self, source_ix):
+ assert self._start[0] <= source_ix <= self._end[0]
+ if self._range_trackers[source_ix] is None:
+ with self._lock:
+ if self._range_trackers[source_ix] is None:
+ self._range_trackers[source_ix] = (
+ self._sources[source_ix].get_range_tracker(
+ self._start[1] if source_ix == self._start[0] else None,
+ self._end[1] if source_ix == self._end[0] else None))
+ return self._range_trackers[source_ix]
+
+ def __init__(self, sources):
+ self._sources = sources
+
+ @property
+ def sources(self):
+ return self._sources
+
+ def estimate_size(self):
+ return sum(s.estimate_size() for s in self._sources)
+
+ def split(
+ self, desired_bundle_size=None, start_position=None, stop_position=None):
+ if start_position or stop_position:
+ raise ValueError(
+ 'Multi-level initial splitting is not supported. Expected start and '
+ 'stop positions to be None. Received %r and %r respectively.' %
+ (start_position, stop_position))
+
+ for source in self._sources:
+ # We assume all sub-sources to produce bundles that specify weight using
+ # the same unit. For example, all sub-sources may specify the size in
+ # bytes as their weight.
+ for bundle in source.split(desired_bundle_size, None, None):
+ yield bundle
+
+ def get_range_tracker(self, start_position=None, stop_position=None):
+ if start_position is None:
+ start_position = (0, None)
+ if stop_position is None:
+ stop_position = (len(self._sources), None)
+ return self.ConcatRangeTracker(start_position, stop_position, self._sources)
+
+ def read(self, range_tracker):
+ start_source, _ = range_tracker.start_position()
+ stop_source, stop_pos = range_tracker.stop_position()
+ if stop_pos is not None:
+ stop_source += 1
+ for source_ix in range(start_source, stop_source):
+ if not range_tracker.try_claim((source_ix, None)):
+ break
+ for record in self._sources[source_ix].read(
+ range_tracker.sub_range_tracker(source_ix)):
+ yield record
+
+ def default_output_coder(self):
+ if self._sources:
+ # Getting coder from the first sub-sources. This assumes all sub-sources
+ # to produce the same coder.
+ return self._sources[0].default_output_coder()
+ else:
+ # Defaulting to PickleCoder.
+ return super(ConcatSource, self).default_output_coder()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d65d17aa/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 3f83d80..4d16560 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,6 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
-from apache_beam.io.filebasedsource import ConcatSource
from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.transforms.util import assert_that
@@ -149,11 +148,11 @@ class SourcesTest(unittest.TestCase):
source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
def test_conact_source(self):
- source = ConcatSource([RangeSource(0, 4),
- RangeSource(4, 8),
- RangeSource(8, 12),
- RangeSource(12, 16),
- ])
+ source = iobase.ConcatSource([RangeSource(0, 4),
+ RangeSource(4, 8),
+ RangeSource(8, 12),
+ RangeSource(12, 16),
+ ])
self.assertEqual(list(source.read(source.get_range_tracker())),
range(16))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
@@ -187,10 +186,10 @@ class SourcesTest(unittest.TestCase):
self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
def test_conact_source_exhaustive(self):
- source = ConcatSource([RangeSource(0, 10),
- RangeSource(100, 110),
- RangeSource(1000, 1010),
- ])
+ source = iobase.ConcatSource([RangeSource(0, 10),
+ RangeSource(100, 110),
+ RangeSource(1000, 1010),
+ ])
source_test_utils.assertSplitAtFractionExhaustive(source)
if __name__ == '__main__':
[4/6] incubator-beam git commit: Implement liquid sharding for concat
source.
Posted by ro...@apache.org.
Implement liquid sharding for concat source.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7d5a37d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7d5a37d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7d5a37d
Branch: refs/heads/python-sdk
Commit: c7d5a37dce37dacd906202191705054032426ba5
Parents: c1f42e6
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Sep 13 16:37:06 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Sep 23 13:44:57 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 166 ++++++++++++++++++---
sdks/python/apache_beam/io/sources_test.py | 89 +++++++++++
2 files changed, 238 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7d5a37d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 5204f04..dd91ef2 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,20 +25,146 @@ for more details.
For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
+import bisect
from multiprocessing.pool import ThreadPool
+import threading
from apache_beam.internal import pickler
from apache_beam.io import fileio
from apache_beam.io import iobase
-
-import range_trackers
+from apache_beam.io import range_trackers
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
-class _ConcatSource(iobase.BoundedSource):
+class ConcatSource(iobase.BoundedSource):
"""A ``BoundedSource`` that can group a set of ``BoundedSources``."""
+ class ConcatRangeTracker(iobase.RangeTracker):
+
+ def __init__(self, start, end, sources, weights=None):
+ super(ConcatSource.ConcatRangeTracker, self).__init__()
+ self._start = start
+ self._end = end
+ self._lock = threading.RLock()
+ self._sources = sources
+ self._range_trackers = [None] * len(self._sources)
+ self._claimed_source_ix = self._start[0]
+
+ if weights is None:
+ n = max(1, end[0] - start[0])
+ self._cumulative_weights = [
+ max(0, min(1, float(k) / n))
+ for k in range(-start[0], len(self._sources) - start[0] + 1)]
+ else:
+ assert len(sources) == len(weights)
+ relevant_weights = weights[start[0]:end[0] + 1]
+ # TODO(robertwb): Implement fraction-at-position to properly scale
+ # partial start and end sources.
+ total = sum(relevant_weights)
+ running_total = [0]
+ for w in relevant_weights:
+ running_total.append(max(1, running_total[-1] + w / total))
+ running_total[-1] = 1 # In case of rounding error.
+ self._cumulative_weights = (
+ [0] * start[0]
+ + running_total
+ + [1] * (len(self._sources) - end[0]))
+
+ def start_position(self):
+ return self._start
+
+ def stop_position(self):
+ return self._end
+
+ def try_claim(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix > self._end[0]:
+ return False
+ elif source_ix == self._end[0] and self._end[1] is None:
+ return False
+ else:
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ else:
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+ def try_split(self, pos):
+ source_ix, source_pos = pos
+ with self._lock:
+ if source_ix < self._claimed_source_ix:
+ # Already claimed.
+ return None
+ elif source_ix > self._end[0]:
+ # After end.
+ return None
+ elif source_ix == self._end[0] and self._end[1] is None:
+ # At/after end.
+ return None
+ else:
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
+ else:
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
+
+ def set_current_position(self, pos):
+ raise NotImplementedError('Should only be called on sub-trackers')
+
+ def position_at_fraction(self, fraction):
+ source_ix, source_frac = self.global_to_local(fraction)
+ if source_ix == len(self._sources):
+ return (source_ix, None)
+ else:
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
+
+ def fraction_consumed(self):
+ with self._lock:
+ return self.local_to_global(self._claimed_source_ix,
+ self.sub_range_tracker(
+ self._claimed_source_ix)
+ .fraction_consumed())
+
+ def local_to_global(self, source_ix, source_frac):
+ cw = self._cumulative_weights
+ return cw[source_ix] + source_frac * (cw[source_ix + 1] - cw[source_ix])
+
+ def global_to_local(self, frac):
+ if frac == 1:
+ return (len(self._sources), 0)
+ else:
+ cw = self._cumulative_weights
+ source_ix = bisect.bisect(cw, frac) - 1
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+
+ def sub_range_tracker(self, source_ix):
+ assert self._start[0] <= source_ix <= self._end[0]
+ if self._range_trackers[source_ix] is None:
+ with self._lock:
+ if self._range_trackers[source_ix] is None:
+ self._range_trackers[source_ix] = (
+ self._sources[source_ix].get_range_tracker(
+ self._start[1] if source_ix == self._start[0] else None,
+ self._end[1] if source_ix == self._end[0] else None))
+ return self._range_trackers[source_ix]
+
def __init__(self, sources):
self._sources = sources
@@ -64,20 +190,23 @@ class _ConcatSource(iobase.BoundedSource):
for bundle in source.split(desired_bundle_size, None, None):
yield bundle
- def get_range_tracker(self, start_position, stop_position):
- assert start_position is None
- assert stop_position is None
- # This will be invoked only when FileBasedSource is read without splitting.
- # For that case, we only support reading the whole source.
- return range_trackers.OffsetRangeTracker(0, len(self.sources))
+ def get_range_tracker(self, start_position=None, stop_position=None):
+ if start_position is None:
+ start_position = (0, None)
+ if stop_position is None:
+ stop_position = (len(self._sources), None)
+ return self.ConcatRangeTracker(start_position, stop_position, self._sources)
def read(self, range_tracker):
- for index, sub_source in enumerate(self.sources):
- if not range_tracker.try_claim(index):
- return
-
- sub_source_tracker = sub_source.get_range_tracker(None, None)
- for record in sub_source.read(sub_source_tracker):
+ start_source, _ = range_tracker.start_position()
+ stop_source, stop_pos = range_tracker.stop_position()
+ if stop_pos is not None:
+ stop_source += 1
+ for source_ix in range(start_source, stop_source):
+ if not range_tracker.try_claim((source_ix, None)):
+ break
+ for record in self._sources[source_ix].read(
+ range_tracker.sub_range_tracker(source_ix)):
yield record
def default_output_coder(self):
@@ -87,7 +216,10 @@ class _ConcatSource(iobase.BoundedSource):
return self._sources[0].default_output_coder()
else:
# Defaulting to PickleCoder.
- return super(_ConcatSource, self).default_output_coder()
+ return super(ConcatSource, self).default_output_coder()
+
+
+_ConcatSource = ConcatSource
class FileBasedSource(iobase.BoundedSource):
@@ -162,7 +294,7 @@ class FileBasedSource(iobase.BoundedSource):
sizes[index],
min_bundle_size=self._min_bundle_size)
single_file_sources.append(single_file_source)
- self._concat_source = _ConcatSource(single_file_sources)
+ self._concat_source = ConcatSource(single_file_sources)
return self._concat_source
def open_file(self, file_name):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c7d5a37d/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 293f437..3f83d80 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -26,7 +26,9 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.io import iobase
+from apache_beam.io.filebasedsource import ConcatSource
from apache_beam.io import range_trackers
+from apache_beam.io import source_test_utils
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -80,6 +82,45 @@ class LineSource(iobase.BoundedSource):
return coders.BytesCoder()
+class RangeSource(iobase.BoundedSource):
+
+ def __init__(self, start, end, split_freq=1):
+ assert start <= end
+ self._start = start
+ self._end = end
+ self._split_freq = split_freq
+
+ def _normalize(self, start_position, end_position):
+ return (self._start if start_position is None else start_position,
+ self._end if end_position is None else end_position)
+
+ def _round_up(self, index):
+ """Rounds up to the nearest mulitple of split_freq."""
+ return index - index % -self._split_freq
+
+ def estimate_size(self):
+ return self._stop - self._start
+
+ def split(self, desired_bundle_size, start_position=None, end_position=None):
+ start, end = self._normalize(start_position, end_position)
+ for sub_start in range(start, end, desired_bundle_size):
+ sub_end = min(self._end, sub_start + desired_bundle_size)
+ yield SourceBundle(RangeSource(sub_start, sub_end, self._split_freq),
+ sub_end - sub_start)
+
+ def get_range_tracker(self, start_position, end_position):
+ start, end = self._normalize(start_position, end_position)
+ return range_trackers.OffsetRangeTracker(start, end)
+
+ def read(self, range_tracker):
+ for k in range(self._round_up(range_tracker.start_position()),
+ self._round_up(range_tracker.stop_position())):
+ if k % self._split_freq == 0:
+ if not range_tracker.try_claim(k):
+ return
+ yield k
+
+
class SourcesTest(unittest.TestCase):
def _create_temp_file(self, contents):
@@ -104,6 +145,54 @@ class SourcesTest(unittest.TestCase):
pipeline.run()
+ def test_range_source(self):
+ source_test_utils.assertSplitAtFractionExhaustive(RangeSource(0, 10, 3))
+
+ def test_conact_source(self):
+ source = ConcatSource([RangeSource(0, 4),
+ RangeSource(4, 8),
+ RangeSource(8, 12),
+ RangeSource(12, 16),
+ ])
+ self.assertEqual(list(source.read(source.get_range_tracker())),
+ range(16))
+ self.assertEqual(list(source.read(source.get_range_tracker((1, None),
+ (2, 10)))),
+ range(4, 10))
+ range_tracker = source.get_range_tracker(None, None)
+ self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
+ self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
+ self.assertEqual(range_tracker.position_at_fraction(.625), (2, 10))
+
+ # Simulate a read.
+ self.assertEqual(range_tracker.try_claim((0, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(0).try_claim(2), True)
+ self.assertEqual(range_tracker.fraction_consumed(), 0.125)
+
+ self.assertEqual(range_tracker.try_claim((1, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(6), True)
+ self.assertEqual(range_tracker.fraction_consumed(), 0.375)
+ self.assertEqual(range_tracker.try_split((0, 1)), None)
+ self.assertEqual(range_tracker.try_split((1, 5)), None)
+
+ self.assertEqual(range_tracker.try_split((3, 14)), ((3, None), 0.75))
+ self.assertEqual(range_tracker.try_claim((3, None)), False)
+ self.assertEqual(range_tracker.sub_range_tracker(1).try_claim(7), True)
+ self.assertEqual(range_tracker.try_claim((2, None)), True)
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(9), True)
+
+ self.assertEqual(range_tracker.try_split((2, 8)), None)
+ self.assertEqual(range_tracker.try_split((2, 11)), ((2, 11), 11. / 12))
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(10), True)
+ self.assertEqual(range_tracker.sub_range_tracker(2).try_claim(11), False)
+
+ def test_conact_source_exhaustive(self):
+ source = ConcatSource([RangeSource(0, 10),
+ RangeSource(100, 110),
+ RangeSource(1000, 1010),
+ ])
+ source_test_utils.assertSplitAtFractionExhaustive(source)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()