You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/15 17:23:38 UTC
[1/2] incubator-beam git commit: Closes #599
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk d898d56ae -> 1305c108a
Closes #599
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1305c108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1305c108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1305c108
Branch: refs/heads/python-sdk
Commit: 1305c108a63cb1675fd9bc36fa990bb87033eaeb
Parents: d898d56 c9c31fd
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 15 10:23:13 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Jul 15 10:23:13 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 9 +--------
sdks/python/apache_beam/io/fileio.py | 14 ++++++++++++++
sdks/python/apache_beam/io/gcsio.py | 15 +++++++++++++++
sdks/python/apache_beam/io/gcsio_test.py | 8 ++++++++
sdks/python/apache_beam/io/range_trackers.py | 3 ++-
sdks/python/apache_beam/io/range_trackers_test.py | 3 +++
6 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Fixes several issues related to
'filebasedsource'.
Posted by ro...@apache.org.
Fixes several issues related to 'filebasedsource'.
Adds a method 'fileio.ChannelFactory.size_in_bytes()' that can be used to determine the size of a single file.
Implements this method for 'ChannelFactory' implementations for GCS and local files.
Updates 'filebasedsource' to use this method when determining size of files.
Fixes a small bug in 'OffsetRangeTracker'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9c31fdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9c31fdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9c31fdf
Branch: refs/heads/python-sdk
Commit: c9c31fdffbe2365a8dedd3154726ab1c01cfa889
Parents: d898d56
Author: Chamikara Jayalath <ch...@apache.org>
Authored: Wed Jul 6 20:25:04 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Fri Jul 15 10:23:13 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 9 +--------
sdks/python/apache_beam/io/fileio.py | 14 ++++++++++++++
sdks/python/apache_beam/io/gcsio.py | 15 +++++++++++++++
sdks/python/apache_beam/io/gcsio_test.py | 8 ++++++++
sdks/python/apache_beam/io/range_trackers.py | 3 ++-
sdks/python/apache_beam/io/range_trackers_test.py | 3 +++
6 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index c877e44..aa0820d 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,7 +26,6 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
from multiprocessing.pool import ThreadPool
-import os
import range_trackers
from apache_beam.io import fileio
@@ -131,13 +130,7 @@ class FileBasedSource(iobase.BoundedSource):
def _estimate_sizes_in_parallel(file_names):
def _calculate_size_of_file(file_name):
- f = fileio.ChannelFactory.open(
- file_name, 'rb', 'application/octet-stream')
- try:
- f.seek(0, os.SEEK_END)
- return f.tell()
- finally:
- f.close()
+ return fileio.ChannelFactory.size_in_bytes(file_name)
return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map(
_calculate_size_of_file, file_names)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 31b6a93..f532077 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -255,6 +255,20 @@ class ChannelFactory(object):
else:
return glob.glob(path)
+ @staticmethod
+ def size_in_bytes(path):
+ """Returns the size of a file in bytes.
+
+ Args:
+ path: a string that gives the path of a single file.
+ """
+ if path.startswith('gs://'):
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.io import gcsio
+ return gcsio.GcsIO().size(path)
+ else:
+ return os.path.getsize(path)
+
class _CompressionType(object):
"""Object representing single compression type."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index c61f251..10409c9 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -236,6 +236,21 @@ class GcsIO(object):
except IOError:
return False
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ def size(self, path):
+ """Returns the size of a single GCS object.
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single GCS object.
+
+ Returns: size of the GCS object in bytes.
+ """
+ bucket, object_path = parse_gcs_path(path)
+ request = storage.StorageObjectsGetRequest(bucket=bucket,
+ object=object_path)
+ return self.client.objects.Get(request).size
+
class GcsBufferedReader(object):
"""A class for reading Google Cloud Storage files."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index eeabb1a..7b15ef3 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -189,6 +189,14 @@ class TestGCSIO(unittest.TestCase):
self.client = FakeGcsClient()
self.gcs = gcsio.GcsIO(self.client)
+ def test_size(self):
+ file_name = 'gs://gcsio-test/dummy_file'
+ file_size = 1234
+
+ self._insert_random_file(self.client, file_name, file_size)
+ self.assertTrue(self.gcs.exists(file_name))
+ self.assertEqual(1234, self.gcs.size(file_name))
+
def test_delete(self):
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index c3481de..a736162 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -114,6 +114,7 @@ class OffsetRangeTracker(iobase.RangeTracker):
self._last_record_start = record_start
def try_split(self, split_offset):
+ assert isinstance(split_offset, (int, long))
with self._lock:
if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
logging.debug('refusing to split %r at %d: stop position unspecified',
@@ -163,7 +164,7 @@ class OffsetRangeTracker(iobase.RangeTracker):
raise Exception(
'get_position_for_fraction_consumed is not applicable for an '
'unbounded range')
- return (math.ceil(self.start_position() + fraction * (
+ return int(math.ceil(self.start_position() + fraction * (
self.stop_position() - self.start_position())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index ceeccd5..77733d3 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -98,6 +98,9 @@ class OffsetRangeTrackerTest(unittest.TestCase):
def test_get_position_for_fraction_dense(self):
# Represents positions 3, 4, 5.
tracker = range_trackers.OffsetRangeTracker(3, 6)
+
+ # Position must be an integer type.
+ self.assertTrue(isinstance(tracker.position_at_fraction(0.0), (int, long)))
# [3, 3) represents 0.0 of [3, 6)
self.assertEqual(3, tracker.position_at_fraction(0.0))
# [3, 4) represents up to 1/3 of [3, 6)