You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/01 05:21:32 UTC

[2/2] incubator-beam git commit: Improvements related to size estimation.

Improvements related to size estimation.

Updates FileBasedSource so that size estimation of glob patterns that expand into a large number of files is done using sampling.

Updates Dataflow runner to set estimated sizes of sources when submitting jobs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f083ed70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f083ed70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f083ed70

Branch: refs/heads/python-sdk
Commit: f083ed706454170782708ba8e5804f63e3c44f83
Parents: 13dd9ff
Author: chamikara@google.com <ch...@google.com>
Authored: Mon Oct 31 07:39:13 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 22:21:21 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/json_value.py  |  4 +--
 sdks/python/apache_beam/io/filebasedsource.py   | 22 ++++++++++++++-
 .../apache_beam/io/filebasedsource_test.py      | 28 ++++++++++++++++++++
 .../apache_beam/runners/dataflow_runner.py      | 25 ++++++++++++-----
 4 files changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/internal/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value.py b/sdks/python/apache_beam/internal/json_value.py
index b7f6cc0..5a3a286 100644
--- a/sdks/python/apache_beam/internal/json_value.py
+++ b/sdks/python/apache_beam/internal/json_value.py
@@ -20,7 +20,7 @@
 from apitools.base.py import extra_types
 
 
-def _get_typed_value_descriptor(obj):
+def get_typed_value_descriptor(obj):
   """Converts a basic type into a @type/value dictionary.
 
   Args:
@@ -80,7 +80,7 @@ def to_json_value(obj, with_type=False):
               key=k, value=to_json_value(v, with_type=with_type)))
     return extra_types.JsonValue(object_value=json_object)
   elif with_type:
-    return to_json_value(_get_typed_value_descriptor(obj), with_type=False)
+    return to_json_value(get_typed_value_descriptor(obj), with_type=False)
   elif isinstance(obj, basestring):
     return extra_types.JsonValue(string_value=obj)
   elif isinstance(obj, bool):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 931628c..77d8840 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,6 +25,8 @@ for more details.
 For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
+import random
+
 from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
@@ -39,6 +41,9 @@ MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""
 
+  MIN_NUMBER_OF_FILES_TO_STAT = 100
+  MIN_FRACTION_OF_FILES_TO_STAT = 0.01
+
   def __init__(self,
                file_pattern,
                min_bundle_size=0,
@@ -142,7 +147,22 @@ class FileBasedSource(iobase.BoundedSource):
         stop_position=stop_position)
 
   def estimate_size(self):
-    return self._get_concat_source().estimate_size()
+    file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
+    if (len(file_names) <=
+        FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
+      return sum(self._estimate_sizes_in_parallel(file_names))
+    else:
+      # Estimating size of a random sample.
+      # TODO: better support distributions where file sizes are not
+      # approximately equal.
+      sample_size = max(FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT,
+                        int(len(file_names) *
+                            FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
+      sample = random.sample(file_names, sample_size)
+      estimate = self._estimate_sizes_in_parallel(sample)
+      return int(
+          sum(estimate) *
+          (float(len(file_names)) / len(sample)))
 
   def read(self, range_tracker):
     return self._get_concat_source().read(range_tracker)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 2c68d2e..d07c1df 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -20,6 +20,7 @@ import cStringIO as StringIO
 import gzip
 import logging
 import math
+import random
 import os
 import tempfile
 import unittest
@@ -256,6 +257,33 @@ class TestFileBasedSource(unittest.TestCase):
     fbs = LineSource(pattern)
     self.assertEquals(17 * 6, fbs.estimate_size())
 
+  def test_estimate_size_with_sampling_same_size(self):
+    num_files = 2 * FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT
+    pattern, _ = write_pattern([10] * num_files)
+    # Each line will be of length 6 since write_pattern() uses
+    # ('line' + line number + '\n') as data.
+    self.assertEqual(
+        6 * 10 * num_files, FileBasedSource(pattern).estimate_size())
+
+  def test_estimate_size_with_sampling_different_sizes(self):
+    num_files = 2 * FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT
+
+    # Each line will be of length 8 since write_pattern() uses
+    # ('line' + line number + '\n') as data.
+    base_size = 500
+    variance = 5
+
+    sizes = []
+    for _ in xrange(num_files):
+      sizes.append(int(random.uniform(base_size - variance,
+                                      base_size + variance)))
+    pattern, _ = write_pattern(sizes)
+    tolerance = 0.05
+    self.assertAlmostEqual(
+        base_size * 8 * num_files,
+        FileBasedSource(pattern).estimate_size(),
+        delta=base_size * 8 * num_files * tolerance)
+
   def test_splits_into_subranges(self):
     pattern, expected_data = write_pattern([5, 9, 6])
     assert len(expected_data) == 20

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index a387332..1d0398c 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -25,10 +25,11 @@ import base64
 import logging
 import threading
 import time
-
+import traceback
 
 from apache_beam import coders
 from apache_beam import pvalue
+from apache_beam.internal import json_value
 from apache_beam.internal import pickler
 from apache_beam.pvalue import PCollectionView
 from apache_beam.runners.runner import PipelineResult
@@ -491,12 +492,24 @@ class DataflowPipelineRunner(PipelineRunner):
 
     if not hasattr(transform.source, 'format'):
       # If a format is not set, we assume the source to be a custom source.
-      source_dict = dict()
-      spec_dict = dict()
+      source_dict = {}
+
+      source_dict['spec'] = {
+          '@type': names.SOURCE_TYPE,
+          names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
+      }
+
+      try:
+        source_dict['metadata'] = {
+            'estimated_size_bytes': json_value.get_typed_value_descriptor(
+                transform.source.estimate_size())
+        }
+      except Exception:  # pylint: disable=broad-except
+        # Size estimation is best effort. So we log the error and continue.
+        logging.info(
+            'Could not estimate size of source %r due to an exception: %s',
+            transform.source, traceback.format_exc())
 
-      spec_dict[names.SERIALIZED_SOURCE_KEY] = pickler.dumps(transform.source)
-      spec_dict['@type'] = names.SOURCE_TYPE
-      source_dict['spec'] = spec_dict
       step.add_property(PropertyNames.SOURCE_STEP_INPUT,
                         source_dict)
     elif transform.source.format == 'text':