You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/06 18:32:46 UTC
[1/2] beam git commit: [BEAM-1892] File size estimation process
reporting
Repository: beam
Updated Branches:
refs/heads/master f87597e10 -> 37e4cc1b8
[BEAM-1892] File size estimation process reporting
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/378b3f5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/378b3f5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/378b3f5b
Branch: refs/heads/master
Commit: 378b3f5bf9886cc390e674a3e600a22ad2e5cb98
Parents: f87597e
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Apr 5 16:49:05 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Apr 6 11:29:10 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 1 +
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 +-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 29 ++++++++++++++-
sdks/python/apache_beam/io/gcp/gcsio.py | 16 +++++++-
sdks/python/apache_beam/io/gcp/gcsio_test.py | 39 ++++++++++++++++++++
5 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 930d958..2e7043f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -35,6 +35,7 @@ from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.value_provider import ValueProvider
from apache_beam.utils.value_provider import StaticValueProvider
from apache_beam.utils.value_provider import check_accessible
+
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 5aef0ab..d79630f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -65,7 +65,7 @@ class GCSFileSystem(FileSystem):
"""
if pattern.endswith('/'):
pattern += '*'
- file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern)
+ file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit)
metadata_list = [FileMetadata(path, size)
for path, size in file_sizes.iteritems()]
return MatchResult(pattern, metadata_list)
http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 73a3893..5a1f10d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -54,7 +54,31 @@ class GCSFileSystemTest(unittest.TestCase):
self.assertEqual(
set(match_result.metadata_list),
expected_results)
- gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+ gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+ 'gs://bucket/*', None)
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_match_multiples_limit(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ limit = 1
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsio_mock.size_of_files_in_glob.return_value = {
+ 'gs://bucket/file1': 1
+ }
+ expected_results = set([
+ FileMetadata('gs://bucket/file1', 1)
+ ])
+ file_system = gcsfilesystem.GCSFileSystem()
+ match_result = file_system.match(['gs://bucket/'], [limit])[0]
+ self.assertEqual(
+ set(match_result.metadata_list),
+ expected_results)
+ self.assertEqual(
+ len(match_result.metadata_list),
+ limit)
+ gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+ 'gs://bucket/*', 1)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples_error(self, mock_gcsio):
@@ -71,7 +95,8 @@ class GCSFileSystemTest(unittest.TestCase):
self.assertTrue(
error.exception.message.startswith('Match operation failed'))
self.assertEqual(error.exception.exception_details, expected_results)
- gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+ gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+ 'gs://bucket/*', None)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiple_patterns(self, mock_gcsio):
http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0a10094..c76c99d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -29,6 +29,7 @@ import os
import Queue
import re
import threading
+import time
import traceback
from apache_beam.utils import retry
@@ -368,7 +369,7 @@ class GcsIO(object):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def size_of_files_in_glob(self, pattern):
+ def size_of_files_in_glob(self, pattern, limit=None):
"""Returns the size of all the files in the glob as a dictionary
Args:
@@ -379,16 +380,29 @@ class GcsIO(object):
prefix = re.match('^[^[*?]*', name_pattern).group(0)
request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
file_sizes = {}
+ counter = 0
+ start_time = time.time()
+ logging.info("Starting the size estimation of the input")
while True:
response = self.client.objects.List(request)
for item in response.items:
if fnmatch.fnmatch(item.name, name_pattern):
file_name = 'gs://%s/%s' % (item.bucket, item.name)
file_sizes[file_name] = item.size
+ counter += 1
+ if limit is not None and counter >= limit:
+ break
+ if counter % 10000 == 0:
+ logging.info("Finished computing size of: %s files", len(file_sizes))
if response.nextPageToken:
request.pageToken = response.nextPageToken
+ if limit is not None and len(file_sizes) >= limit:
+ break
else:
break
+ logging.info(
+ "Finished the size estimation of the input at %s files. " +\
+ "Estimation took %s seconds", counter, time.time() - start_time)
return file_sizes
http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index c028f0d..73d2213 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -747,6 +747,45 @@ class TestGCSIO(unittest.TestCase):
self.assertEqual(
self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
+ def test_size_of_files_in_glob_limited(self):
+ bucket_name = 'gcsio-test'
+ object_names = [
+ ('cow/cat/fish', 2),
+ ('cow/cat/blubber', 3),
+ ('cow/dog/blubber', 4),
+ ('apple/dog/blubber', 5),
+ ('apple/fish/blubber', 6),
+ ('apple/fish/blowfish', 7),
+ ('apple/fish/bambi', 8),
+ ('apple/fish/balloon', 9),
+ ('apple/fish/cat', 10),
+ ('apple/fish/cart', 11),
+ ('apple/fish/carl', 12),
+ ('apple/dish/bat', 13),
+ ('apple/dish/cat', 14),
+ ('apple/dish/carl', 15),
+ ]
+ for (object_name, size) in object_names:
+ file_name = 'gs://%s/%s' % (bucket_name, object_name)
+ self._insert_random_file(self.client, file_name, size)
+ test_cases = [
+ ('gs://gcsio-test/cow/*', [
+ ('cow/cat/fish', 2),
+ ('cow/cat/blubber', 3),
+ ('cow/dog/blubber', 4),
+ ]),
+ ('gs://gcsio-test/apple/fish/car?', [
+ ('apple/fish/cart', 11),
+ ('apple/fish/carl', 12),
+ ])
+ ]
+ # Check if limits are followed correctly
+ limit = 1
+ for file_pattern, expected_object_names in test_cases:
+ expected_num_items = min(len(expected_object_names), limit)
+ self.assertEqual(
+ len(self.gcs.glob(file_pattern, limit)), expected_num_items)
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestPipeStream(unittest.TestCase):
[2/2] beam git commit: This closes #2445
Posted by ch...@apache.org.
This closes #2445
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37e4cc1b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37e4cc1b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37e4cc1b
Branch: refs/heads/master
Commit: 37e4cc1b876bd990d289b1cc2ec24b8d9e53c2d8
Parents: f87597e 378b3f5
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Apr 6 11:31:24 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Apr 6 11:31:24 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 1 +
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 +-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 29 ++++++++++++++-
sdks/python/apache_beam/io/gcp/gcsio.py | 16 +++++++-
sdks/python/apache_beam/io/gcp/gcsio_test.py | 39 ++++++++++++++++++++
5 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------