You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/24 03:23:20 UTC

[2/2] incubator-beam git commit: Updates FileBasedSource so that sub-class can prevent splitting to data ranges.

Updates FileBasedSource so that sub-class can prevent splitting to data ranges.

File patterns will be split into sources of individual files, but any further splitting into data ranges will be prevented. This prevents both initial and dynamic splitting.

Introduces UnsplittableRangeTracker, which can be used to make any given RangeTracker object unsplittable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e0a63cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e0a63cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e0a63cc

Branch: refs/heads/python-sdk
Commit: 2e0a63ccf4c2efe64b7e9dc814c6f854703cf933
Parents: 77ce9b7
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Aug 22 21:06:36 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 23 20:22:57 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   | 52 ++++++++++++++------
 .../apache_beam/io/filebasedsource_test.py      | 34 ++++++++++---
 sdks/python/apache_beam/io/iobase.py            |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 40 +++++++++++++++
 .../apache_beam/io/range_trackers_test.py       | 24 +++++++++
 5 files changed, 130 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e0a63cc/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index aa0820d..4c9cd95 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -91,7 +91,7 @@ class _ConcatSource(iobase.BoundedSource):
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""
 
-  def __init__(self, file_pattern, min_bundle_size=0):
+  def __init__(self, file_pattern, min_bundle_size=0, splittable=True):
     """Initializes ``FileBasedSource``.
 
     Args:
@@ -102,6 +102,7 @@ class FileBasedSource(iobase.BoundedSource):
     self._pattern = file_pattern
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
+    self._splittable = splittable
 
   def _get_concat_source(self):
     if self._concat_source is None:
@@ -174,6 +175,10 @@ class FileBasedSource(iobase.BoundedSource):
     """
     raise NotImplementedError
 
+  @property
+  def splittable(self):
+    return self._splittable
+
 
 class _SingleFileSource(iobase.BoundedSource):
   """Denotes a source for a specific file type.
@@ -207,22 +212,36 @@ class _SingleFileSource(iobase.BoundedSource):
     if stop_offset is None:
       stop_offset = self._stop_offset
 
-    bundle_size = max(desired_bundle_size, self._min_bundle_size)
-
-    bundle_start = start_offset
-    while bundle_start < stop_offset:
-      bundle_stop = min(bundle_start + bundle_size, stop_offset)
+    if self._file_based_source.splittable:
+      bundle_size = max(desired_bundle_size, self._min_bundle_size)
+
+      bundle_start = start_offset
+      while bundle_start < stop_offset:
+        bundle_stop = min(bundle_start + bundle_size, stop_offset)
+        yield iobase.SourceBundle(
+            bundle_stop - bundle_start,
+            _SingleFileSource(
+                self._file_based_source,
+                self._file_name,
+                bundle_start,
+                bundle_stop,
+                min_bundle_size=self._min_bundle_size),
+            bundle_start,
+            bundle_stop)
+        bundle_start = bundle_stop
+    else:
       yield iobase.SourceBundle(
-          bundle_stop - bundle_start,
+          stop_offset - start_offset,
           _SingleFileSource(
               self._file_based_source,
               self._file_name,
-              bundle_start,
-              bundle_stop,
-              min_bundle_size=self._min_bundle_size),
-          bundle_start,
-          bundle_stop)
-      bundle_start = bundle_stop
+              start_offset,
+              stop_offset,
+              min_bundle_size=self._min_bundle_size
+          ),
+          start_offset,
+          stop_offset
+      )
 
   def estimate_size(self):
     return self._stop_offset - self._start_offset
@@ -233,7 +252,12 @@ class _SingleFileSource(iobase.BoundedSource):
     if stop_position is None:
       stop_position = self._stop_offset
 
-    return range_trackers.OffsetRangeTracker(start_position, stop_position)
+    range_tracker = range_trackers.OffsetRangeTracker(
+        start_position, stop_position)
+    if not self._file_based_source.splittable:
+      range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
+
+    return range_tracker
 
   def read(self, range_tracker):
     return self._file_based_source.read_records(self._file_name, range_tracker)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e0a63cc/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 1bf51b2..ed67346 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -251,21 +251,39 @@ class TestFileBasedSource(unittest.TestCase):
 
     self.assertItemsEqual(expected_data, read_data)
 
-  def test_dataflow_file(self):
-    file_name, expected_data = _write_data(100)
-    assert len(expected_data) == 100
+  def _run_dataflow_test(self, pattern, expected_data, splittable=True):
     pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name))
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        pattern, splittable=splittable))
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_dataflow_file(self):
+    file_name, expected_data = _write_data(100)
+    assert len(expected_data) == 100
+    self._run_dataflow_test(file_name, expected_data)
+
   def test_dataflow_pattern(self):
     pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
     assert len(expected_data) == 200
-    pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern))
-    assert_that(pcoll, equal_to(expected_data))
-    pipeline.run()
+    self._run_dataflow_test(pattern, expected_data)
+
+  def test_unsplittable_does_not_split(self):
+    pattern, expected_data = _write_pattern([5, 9, 6])
+    assert len(expected_data) == 20
+    fbs = LineSource(pattern, splittable=False)
+    splits = [split for split in fbs.split(desired_bundle_size=15)]
+    self.assertEquals(3, len(splits))
+
+  def test_dataflow_file_unsplittable(self):
+    file_name, expected_data = _write_data(100)
+    assert len(expected_data) == 100
+    self._run_dataflow_test(file_name, expected_data, False)
+
+  def test_dataflow_pattern_unsplittable(self):
+    pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
+    assert len(expected_data) == 200
+    self._run_dataflow_test(pattern, expected_data, False)
 
 
 class TestSingleFileSource(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e0a63cc/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index b683eb2..b269ae5 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -638,7 +638,8 @@ class RangeTracker(object):
       position: suggested position where the current range should try to
                 be split at.
     Returns:
-      a tuple containing the split position and split fraction.
+      a tuple containing the split position and split fraction if split is
+      successful. Returns ``None`` otherwise.
     """
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e0a63cc/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index a736162..af6f6c8 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -284,3 +284,43 @@ class GroupedShuffleRangeTracker(iobase.RangeTracker):
     raise RuntimeError('GroupedShuffleRangeTracker does not measure fraction'
                        ' consumed due to positions being opaque strings'
                        ' that are interpreted by the service')
+
+
+class UnsplittableRangeTracker(iobase.RangeTracker):
+  """A RangeTracker that always ignores split requests.
+
+  This can be used to make a given ``RangeTracker`` object unsplittable by
+  ignoring all calls to ``try_split()``. All other calls will be delegated to
+  the given ``RangeTracker``.
+  """
+
+  def __init__(self, range_tracker):
+    """Initializes UnsplittableRangeTracker.
+
+    Args:
+      range_tracker: a ``RangeTracker`` to which all method calls expect calls
+      to ``try_split()`` will be delegated.
+    """
+    assert range_tracker
+    self._range_tracker = range_tracker
+
+  def start_position(self):
+    return self._range_tracker.start_position()
+
+  def stop_position(self):
+    return self._range_tracker.stop_position()
+
+  def position_at_fraction(self, fraction):
+    return self._range_tracker.position_at_fraction(fraction)
+
+  def try_claim(self, position):
+    return self._range_tracker.try_claim(position)
+
+  def try_split(self, position):
+    return None
+
+  def set_current_position(self, position):
+    self._range_tracker.set_current_position(position)
+
+  def fraction_consumed(self):
+    return self._range_tracker.fraction_consumed()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e0a63cc/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index 77733d3..c4c1e28 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -319,6 +319,30 @@ class GroupedShuffleRangeTrackerTest(unittest.TestCase):
         self.bytes_to_position([3, 2, 1])))
 
 
+class UnsplittableRangeTrackerTest(unittest.TestCase):
+
+  def test_try_claim(self):
+    tracker = range_trackers.UnsplittableRangeTracker(
+        range_trackers.OffsetRangeTracker(100, 200))
+    self.assertTrue(tracker.try_claim(110))
+    self.assertTrue(tracker.try_claim(140))
+    self.assertTrue(tracker.try_claim(183))
+    self.assertFalse(tracker.try_claim(210))
+
+  def test_try_split_fails(self):
+    tracker = range_trackers.UnsplittableRangeTracker(
+        range_trackers.OffsetRangeTracker(100, 200))
+    self.assertTrue(tracker.try_claim(110))
+    # Out of range
+    self.assertFalse(tracker.try_split(109))
+    self.assertFalse(tracker.try_split(210))
+
+    # Within range. But splitting is still unsuccessful.
+    self.assertFalse(copy.copy(tracker).try_split(111))
+    self.assertFalse(copy.copy(tracker).try_split(130))
+    self.assertFalse(copy.copy(tracker).try_split(199))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()