You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/01 05:21:32 UTC
[2/2] incubator-beam git commit: Improvements related to size
estimation.
Improvements related to size estimation.
Updates FileBasedSource so that size estimation of glob patterns that expand into a large number of files is done using sampling.
Updates Dataflow runner to set estimated sizes of sources when submitting jobs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f083ed70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f083ed70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f083ed70
Branch: refs/heads/python-sdk
Commit: f083ed706454170782708ba8e5804f63e3c44f83
Parents: 13dd9ff
Author: chamikara@google.com <ch...@google.com>
Authored: Mon Oct 31 07:39:13 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 22:21:21 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/json_value.py | 4 +--
sdks/python/apache_beam/io/filebasedsource.py | 22 ++++++++++++++-
.../apache_beam/io/filebasedsource_test.py | 28 ++++++++++++++++++++
.../apache_beam/runners/dataflow_runner.py | 25 ++++++++++++-----
4 files changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/internal/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value.py b/sdks/python/apache_beam/internal/json_value.py
index b7f6cc0..5a3a286 100644
--- a/sdks/python/apache_beam/internal/json_value.py
+++ b/sdks/python/apache_beam/internal/json_value.py
@@ -20,7 +20,7 @@
from apitools.base.py import extra_types
-def _get_typed_value_descriptor(obj):
+def get_typed_value_descriptor(obj):
"""Converts a basic type into a @type/value dictionary.
Args:
@@ -80,7 +80,7 @@ def to_json_value(obj, with_type=False):
key=k, value=to_json_value(v, with_type=with_type)))
return extra_types.JsonValue(object_value=json_object)
elif with_type:
- return to_json_value(_get_typed_value_descriptor(obj), with_type=False)
+ return to_json_value(get_typed_value_descriptor(obj), with_type=False)
elif isinstance(obj, basestring):
return extra_types.JsonValue(string_value=obj)
elif isinstance(obj, bool):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 931628c..77d8840 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,6 +25,8 @@ for more details.
For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
+import random
+
from multiprocessing.pool import ThreadPool
from apache_beam.internal import pickler
@@ -39,6 +41,9 @@ MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
class FileBasedSource(iobase.BoundedSource):
"""A ``BoundedSource`` for reading a file glob of a given type."""
+ MIN_NUMBER_OF_FILES_TO_STAT = 100
+ MIN_FRACTION_OF_FILES_TO_STAT = 0.01
+
def __init__(self,
file_pattern,
min_bundle_size=0,
@@ -142,7 +147,22 @@ class FileBasedSource(iobase.BoundedSource):
stop_position=stop_position)
def estimate_size(self):
- return self._get_concat_source().estimate_size()
+ file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
+ if (len(file_names) <=
+ FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
+ return sum(self._estimate_sizes_in_parallel(file_names))
+ else:
+ # Estimating size of a random sample.
+ # TODO: better support distributions where file sizes are not
+ # approximately equal.
+ sample_size = max(FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT,
+ int(len(file_names) *
+ FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
+ sample = random.sample(file_names, sample_size)
+ estimate = self._estimate_sizes_in_parallel(sample)
+ return int(
+ sum(estimate) *
+ (float(len(file_names)) / len(sample)))
def read(self, range_tracker):
return self._get_concat_source().read(range_tracker)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 2c68d2e..d07c1df 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -20,6 +20,7 @@ import cStringIO as StringIO
import gzip
import logging
import math
+import random
import os
import tempfile
import unittest
@@ -256,6 +257,33 @@ class TestFileBasedSource(unittest.TestCase):
fbs = LineSource(pattern)
self.assertEquals(17 * 6, fbs.estimate_size())
+ def test_estimate_size_with_sampling_same_size(self):
+ num_files = 2 * FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT
+ pattern, _ = write_pattern([10] * num_files)
+ # Each line will be of length 6 since write_pattern() uses
+ # ('line' + line number + '\n') as data.
+ self.assertEqual(
+ 6 * 10 * num_files, FileBasedSource(pattern).estimate_size())
+
+ def test_estimate_size_with_sampling_different_sizes(self):
+ num_files = 2 * FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT
+
+ # Each line will be of length 8 since write_pattern() uses
+ # ('line' + line number + '\n') as data.
+ base_size = 500
+ variance = 5
+
+ sizes = []
+ for _ in xrange(num_files):
+ sizes.append(int(random.uniform(base_size - variance,
+ base_size + variance)))
+ pattern, _ = write_pattern(sizes)
+ tolerance = 0.05
+ self.assertAlmostEqual(
+ base_size * 8 * num_files,
+ FileBasedSource(pattern).estimate_size(),
+ delta=base_size * 8 * num_files * tolerance)
+
def test_splits_into_subranges(self):
pattern, expected_data = write_pattern([5, 9, 6])
assert len(expected_data) == 20
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f083ed70/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index a387332..1d0398c 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -25,10 +25,11 @@ import base64
import logging
import threading
import time
-
+import traceback
from apache_beam import coders
from apache_beam import pvalue
+from apache_beam.internal import json_value
from apache_beam.internal import pickler
from apache_beam.pvalue import PCollectionView
from apache_beam.runners.runner import PipelineResult
@@ -491,12 +492,24 @@ class DataflowPipelineRunner(PipelineRunner):
if not hasattr(transform.source, 'format'):
# If a format is not set, we assume the source to be a custom source.
- source_dict = dict()
- spec_dict = dict()
+ source_dict = {}
+
+ source_dict['spec'] = {
+ '@type': names.SOURCE_TYPE,
+ names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
+ }
+
+ try:
+ source_dict['metadata'] = {
+ 'estimated_size_bytes': json_value.get_typed_value_descriptor(
+ transform.source.estimate_size())
+ }
+ except Exception: # pylint: disable=broad-except
+ # Size estimation is best effort. So we log the error and continue.
+ logging.info(
+ 'Could not estimate size of source %r due to an exception: %s',
+ transform.source, traceback.format_exc())
- spec_dict[names.SERIALIZED_SOURCE_KEY] = pickler.dumps(transform.source)
- spec_dict['@type'] = names.SOURCE_TYPE
- source_dict['spec'] = spec_dict
step.add_property(PropertyNames.SOURCE_STEP_INPUT,
source_dict)
elif transform.source.format == 'text':