You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/03 00:13:35 UTC
[1/2] incubator-beam git commit: Do not need to list all files in GCS
for validation. Add limit field to fileIO
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 2363ee510 -> fd6a52c15
Do not need to list all files in GCS for validation. Add limit field to fileIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16886904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16886904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16886904
Branch: refs/heads/python-sdk
Commit: 16886904df9fd1d3f92e1f7aabd134a28d6c1c00
Parents: 2363ee5
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Dec 2 13:56:42 2016 -0800
Committer: Sourabh Bajaj <so...@google.com>
Committed: Fri Dec 2 13:56:42 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 3 ++-
sdks/python/apache_beam/io/fileio.py | 7 ++++---
sdks/python/apache_beam/io/gcsio.py | 6 ++++--
sdks/python/apache_beam/io/gcsio_test.py | 7 +++++++
4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 14c2b06..8921801 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -175,7 +175,8 @@ class FileBasedSource(iobase.BoundedSource):
def _validate(self):
"""Validate if there are actual files in the specified glob pattern
"""
- if len(fileio.ChannelFactory.glob(self._pattern)) <= 0:
+ # Limit the responses as we only want to check if something exists
+ if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0:
raise IOError(
'No files found based on the file pattern %s' % self._pattern)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c71a730..82e7813 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -588,11 +588,12 @@ class ChannelFactory(object):
raise IOError(err)
@staticmethod
- def glob(path):
+ def glob(path, limit=None):
if path.startswith('gs://'):
- return gcsio.GcsIO().glob(path)
+ return gcsio.GcsIO().glob(path, limit)
else:
- return glob.glob(path)
+ files = glob.glob(path)
+ return files[:limit]
@staticmethod
def size_in_bytes(path):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9adb946..748465f 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -142,7 +142,7 @@ class GcsIO(object):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def glob(self, pattern):
+ def glob(self, pattern, limit=None):
"""Return the GCS path names matching a given path name pattern.
Path name patterns are those recognized by fnmatch.fnmatch(). The path
@@ -166,9 +166,11 @@ class GcsIO(object):
object_paths.append('gs://%s/%s' % (item.bucket, item.name))
if response.nextPageToken:
request.pageToken = response.nextPageToken
+ if limit is not None and len(object_paths) >= limit:
+ break
else:
break
- return object_paths
+ return object_paths[:limit]
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 9d44e17..5af13c6 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -652,6 +652,13 @@ class TestGCSIO(unittest.TestCase):
self.assertEqual(
set(self.gcs.glob(file_pattern)), set(expected_file_names))
+ # Check if limits are followed correctly
+ limit = 3
+ for file_pattern, expected_object_names in test_cases:
+ expected_num_items = min(len(expected_object_names), limit)
+ self.assertEqual(
+ len(self.gcs.glob(file_pattern, limit)), expected_num_items)
+
def test_size_of_files_in_glob(self):
bucket_name = 'gcsio-test'
object_names = [
[2/2] incubator-beam git commit: Closes #1431
Posted by dh...@apache.org.
Closes #1431
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd6a52c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd6a52c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd6a52c1
Branch: refs/heads/python-sdk
Commit: fd6a52c15df5741d6b6661ea98c680a94775f7f9
Parents: 2363ee5 1688690
Author: Dan Halperin <dh...@google.com>
Authored: Fri Dec 2 16:13:28 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Dec 2 16:13:28 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 3 ++-
sdks/python/apache_beam/io/fileio.py | 7 ++++---
sdks/python/apache_beam/io/gcsio.py | 6 ++++--
sdks/python/apache_beam/io/gcsio_test.py | 7 +++++++
4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------