You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/02 23:27:42 UTC
[beam] branch master updated: Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0daef62a7bd Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData
0daef62a7bd is described below
commit 0daef62a7bd993b13064de80588e343ee764e004
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon May 2 19:27:36 2022 -0400
Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData
* [BEAM-14314] Add last_updated field in filesystem.FileMetaData
* Add last_updated_in_seconds field in FileMetaData and used in match
* Add metadata method for FileSystem implementations
* Fix precommit
* Fix style and docstring
* increase test coverage for metadata methods
* Address comments and Last updated Metadata improvements
* Fix naming
* Implement last_updated in hadoop filesystem
* [BEAM-9532] fix issue with s3io_test last updated
* Fix pylint and add back removed coverage
* Fix haddopfilesystem unit test
* Address comments and fix flake hadoopfilesystem unit test
* Add comments to time assertion choice in test
* Fix method _status renaming leftover
* Also fix method _status renaming leftover in tests
---
.../apache_beam/io/aws/clients/s3/fake_client.py | 6 +-
sdks/python/apache_beam/io/aws/s3filesystem.py | 25 ++++-
.../python/apache_beam/io/aws/s3filesystem_test.py | 40 +++++---
sdks/python/apache_beam/io/aws/s3io.py | 98 +++++++++++++------
sdks/python/apache_beam/io/aws/s3io_test.py | 21 +++-
.../apache_beam/io/azure/blobstoragefilesystem.py | 25 ++++-
.../io/azure/blobstoragefilesystem_test.py | 45 ++++++---
sdks/python/apache_beam/io/azure/blobstorageio.py | 108 +++++++++++++--------
sdks/python/apache_beam/io/filesystem.py | 50 ++++++++--
sdks/python/apache_beam/io/filesystem_test.py | 3 +
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 24 ++++-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 31 +++---
sdks/python/apache_beam/io/gcp/gcsio.py | 105 +++++++++++++-------
sdks/python/apache_beam/io/gcp/gcsio_test.py | 20 ++++
sdks/python/apache_beam/io/hadoopfilesystem.py | 52 ++++++++--
.../python/apache_beam/io/hadoopfilesystem_test.py | 25 +++--
sdks/python/apache_beam/io/localfilesystem.py | 18 +++-
sdks/python/apache_beam/io/localfilesystem_test.py | 10 +-
18 files changed, 522 insertions(+), 184 deletions(-)
diff --git a/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py b/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
index b53ff32825a..e2d34a11a46 100644
--- a/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
+++ b/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
@@ -20,6 +20,8 @@
import datetime
import time
+import pytz
+
from apache_beam.io.aws.clients.s3 import messages
@@ -39,8 +41,8 @@ class FakeFile(object):
def get_metadata(self):
last_modified_datetime = None
if self.last_modified:
- last_modified_datetime = datetime.datetime.utcfromtimestamp(
- self.last_modified)
+ last_modified_datetime = datetime.datetime.fromtimestamp(
+ self.last_modified, pytz.utc)
return messages.Item(
self.etag,
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py
index 8a5e94e3fc7..68d8ea7e313 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem.py
@@ -128,9 +128,9 @@ class S3FileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, size in \
- s3io.S3IO(options=self._options).list_prefix(dir_or_prefix).items():
- yield FileMetadata(path, size)
+ for path, (size, updated) in s3io.S3IO(options=self._options) \
+ .list_prefix(dir_or_prefix, with_metadata=True).items():
+ yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
@@ -281,6 +281,25 @@ class S3FileSystem(FileSystem):
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})
+ def metadata(self, path):
+ """Fetch metadata fields of a file on the FileSystem.
+
+ Args:
+ path: string path of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if path isn't a file or doesn't exist.
+ """
+ try:
+ file_metadata = s3io.S3IO(options=self._options)._status(path)
+ return FileMetadata(
+ path, file_metadata['size'], file_metadata['last_updated'])
+ except Exception as e: # pylint: disable=broad-except
+ raise BeamIOError("Metadata operation failed", {path: e})
+
def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
index eba67476f13..dd7209f0cbb 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
@@ -78,22 +78,36 @@ class S3FileSystemTest(unittest.TestCase):
with self.assertRaises(ValueError):
self.fs.split('/no/s3/prefix')
+ @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
+ def test_match_single(self, unused_mock_arg):
+ # Prepare mocks.
+ s3io_mock = mock.MagicMock()
+ s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
+ s3io_mock._status.return_value = {'size': 1, 'last_updated': 9999999.0}
+ expected_results = [FileMetadata('s3://bucket/file1', 1, 9999999.0)]
+ match_result = self.fs.match(['s3://bucket/file1'])[0]
+
+ self.assertEqual(match_result.metadata_list, expected_results)
+ s3io_mock._status.assert_called_once_with('s3://bucket/file1')
+
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.return_value = {
- 's3://bucket/file1': 1, 's3://bucket/file2': 2
+ 's3://bucket/file1': (1, 9999999.0),
+ 's3://bucket/file2': (2, 8888888.0)
}
expected_results = set([
- FileMetadata('s3://bucket/file1', 1),
- FileMetadata('s3://bucket/file2', 2)
+ FileMetadata('s3://bucket/file1', 1, 9999999.0),
+ FileMetadata('s3://bucket/file2', 2, 8888888.0)
])
match_result = self.fs.match(['s3://bucket/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
- s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+ s3io_mock.list_prefix.assert_called_once_with(
+ 's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_limit(self, unused_mock_arg):
@@ -101,12 +115,13 @@ class S3FileSystemTest(unittest.TestCase):
s3io_mock = mock.MagicMock()
limit = 1
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
- s3io_mock.list_prefix.return_value = {'s3://bucket/file1': 1}
- expected_results = set([FileMetadata('s3://bucket/file1', 1)])
+ s3io_mock.list_prefix.return_value = {'s3://bucket/file1': (1, 99999.0)}
+ expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['s3://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
- s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+ s3io_mock.list_prefix.assert_called_once_with(
+ 's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_error(self, unused_mock_arg):
@@ -120,7 +135,8 @@ class S3FileSystemTest(unittest.TestCase):
self.fs.match(['s3://bucket/'])
self.assertIn('Match operation failed', str(error.exception))
- s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+ s3io_mock.list_prefix.assert_called_once_with(
+ 's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiple_patterns(self, unused_mock_arg):
@@ -129,14 +145,14 @@ class S3FileSystemTest(unittest.TestCase):
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.side_effect = [
{
- 's3://bucket/file1': 1
+ 's3://bucket/file1': (1, 99999.0)
},
{
- 's3://bucket/file2': 2
+ 's3://bucket/file2': (2, 88888.0)
},
]
- expected_results = [[FileMetadata('s3://bucket/file1', 1)],
- [FileMetadata('s3://bucket/file2', 2)]]
+ expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
+ [FileMetadata('s3://bucket/file2', 2, 88888.0)]]
result = self.fs.match(['s3://bucket/file1*', 's3://bucket/file2*'])
self.assertEqual([mr.metadata_list for mr in result], expected_results)
diff --git a/sdks/python/apache_beam/io/aws/s3io.py b/sdks/python/apache_beam/io/aws/s3io.py
index ac0c92f0d37..d8bbfe164be 100644
--- a/sdks/python/apache_beam/io/aws/s3io.py
+++ b/sdks/python/apache_beam/io/aws/s3io.py
@@ -101,23 +101,28 @@ class S3IO(object):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def list_prefix(self, path):
+ def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: S3 file path pattern in the form s3://<bucket>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
Returns:
- Dictionary of file name -> size.
+ If ``with_metadata`` is False: dict of file name -> size; if
+ ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
bucket, prefix = parse_s3_path(path, object_optional=True)
request = messages.ListRequest(bucket=bucket, prefix=prefix)
- file_sizes = {}
+ file_info = {}
counter = 0
start_time = time.time()
- logging.info("Starting the size estimation of the input")
+ if with_metadata:
+ logging.info("Starting the file information of the input")
+ else:
+ logging.info("Starting the size estimation of the input")
while True:
#The list operation will raise an exception
@@ -134,10 +139,19 @@ class S3IO(object):
for item in response.items:
file_name = 's3://%s/%s' % (bucket, item.key)
- file_sizes[file_name] = item.size
+ if with_metadata:
+ file_info[file_name] = (
+ item.size, self._updated_to_seconds(item.last_modified))
+ else:
+ file_info[file_name] = item.size
counter += 1
if counter % 10000 == 0:
- logging.info("Finished computing size of: %s files", len(file_sizes))
+ if with_metadata:
+ logging.info(
+ "Finished computing file information of: %s files",
+ len(file_info))
+ else:
+ logging.info("Finished computing size of: %s files", len(file_info))
if response.next_token:
request.continuation_token = response.next_token
else:
@@ -148,20 +162,15 @@ class S3IO(object):
counter,
time.time() - start_time)
- return file_sizes
+ return file_info
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def checksum(self, path):
"""Looks up the checksum of an S3 object.
Args:
path: S3 file path pattern in the form s3://<bucket>/<name>.
"""
- bucket, object_path = parse_s3_path(path)
- request = messages.GetRequest(bucket, object_path)
- item = self.client.get_object_metadata(request)
- return item.etag
+ return self._s3_object(path).etag
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
@@ -400,8 +409,6 @@ class S3IO(object):
paths = self.list_prefix(root)
return self.delete_files(paths)
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def size(self, path):
"""Returns the size of a single S3 object.
@@ -410,10 +417,7 @@ class S3IO(object):
Returns: size of the S3 object in bytes.
"""
- bucket, object_path = parse_s3_path(path)
- request = messages.GetRequest(bucket, object_path)
- item = self.client.get_object_metadata(request)
- return item.size
+ return self._s3_object(path).size
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
@@ -428,8 +432,6 @@ class S3IO(object):
self.copy(src, dest)
self.delete(src)
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def last_updated(self, path):
"""Returns the last updated epoch time of a single S3 object.
@@ -438,12 +440,7 @@ class S3IO(object):
Returns: last updated time of the S3 object in second.
"""
- bucket, object = parse_s3_path(path)
- request = messages.GetRequest(bucket, object)
- datetime = self.client.get_object_metadata(request).last_modified
- return (
- time.mktime(datetime.timetuple()) - time.timezone +
- datetime.microsecond / 1000000.0)
+ return self._updated_to_seconds(self._s3_object(path).last_modified)
def exists(self, path):
"""Returns whether the given S3 object exists.
@@ -451,10 +448,8 @@ class S3IO(object):
Args:
path: S3 file path pattern in the form s3://<bucket>/<name>.
"""
- bucket, object = parse_s3_path(path)
- request = messages.GetRequest(bucket, object)
try:
- self.client.get_object_metadata(request)
+ self._s3_object(path)
return True
except messages.S3ClientError as e:
if e.code == 404:
@@ -464,6 +459,49 @@ class S3IO(object):
# We re-raise all other exceptions
raise
+ def _status(self, path):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns supported fields (checksum, last_updated, size) of a single object
+ as a dict at once.
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single S3 object.
+
+ Returns: dict of fields of the S3 object.
+ """
+ s3_object = self._s3_object(path)
+ file_status = {}
+ if hasattr(s3_object, 'etag'):
+ file_status['checksum'] = s3_object.etag
+ if hasattr(s3_object, 'last_modified'):
+ file_status['last_updated'] = self._updated_to_seconds(
+ s3_object.last_modified)
+ if hasattr(s3_object, 'size'):
+ file_status['size'] = s3_object.size
+ return file_status
+
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ def _s3_object(self, path):
+ """Returns a S3 object metadata for the given path
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single S3 object.
+
+ Returns: S3 object metadata.
+ """
+ bucket, object = parse_s3_path(path)
+ request = messages.GetRequest(bucket, object)
+ return self.client.get_object_metadata(request)
+
+ @staticmethod
+ def _updated_to_seconds(updated):
+ """Helper function transform the updated field of response to seconds"""
+ return (
+ time.mktime(updated.timetuple()) - time.timezone +
+ updated.microsecond / 1000000.0)
+
def rename_files(self, src_dest_pairs):
"""Renames the given S3 objects from src to dest.
diff --git a/sdks/python/apache_beam/io/aws/s3io_test.py b/sdks/python/apache_beam/io/aws/s3io_test.py
index cbc05ca25d3..ffab9572707 100644
--- a/sdks/python/apache_beam/io/aws/s3io_test.py
+++ b/sdks/python/apache_beam/io/aws/s3io_test.py
@@ -113,13 +113,13 @@ class TestS3IO(unittest.TestCase):
self.aws.delete(file_name)
def test_last_updated(self):
- self.skipTest('BEAM-9532 fix issue with s3 last updated')
file_name = self.TEST_DATA_PATH + 'dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.aws.exists(file_name))
-
+ # The time difference should be tiny for the mock client.
+ # A loose tolerance is for the consideration of real s3 client.
tolerance = 5 * 60 # 5 mins
result = self.aws.last_updated(file_name)
self.assertAlmostEqual(result, time.time(), delta=tolerance)
@@ -128,7 +128,6 @@ class TestS3IO(unittest.TestCase):
self.aws.delete(file_name)
def test_checksum(self):
-
file_name = self.TEST_DATA_PATH + 'checksum'
file_size = 1024
file_ = self._insert_random_file(self.client, file_name, file_size)
@@ -149,6 +148,22 @@ class TestS3IO(unittest.TestCase):
# Clean up
self.aws.delete(file_name)
+ def test_file_status(self):
+ file_name = self.TEST_DATA_PATH + 'metadata'
+ file_size = 1024
+ self._insert_random_file(self.client, file_name, file_size)
+ file_checksum = self.aws.checksum(file_name)
+ file_timestamp = self.aws.last_updated(file_name)
+
+ file_status = self.aws._status(file_name)
+
+ self.assertEqual(file_status['size'], file_size)
+ self.assertEqual(file_status['checksum'], file_checksum)
+ self.assertEqual(file_status['last_updated'], file_timestamp)
+
+ # Clean up
+ self.aws.delete(file_name)
+
def test_copy(self):
src_file_name = self.TEST_DATA_PATH + 'source'
dest_file_name = self.TEST_DATA_PATH + 'dest'
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index 28a5a5841b3..19ba078cbbe 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -118,9 +118,9 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, size in \
- blobstorageio.BlobStorageIO().list_prefix(dir_or_prefix).items():
- yield FileMetadata(path, size)
+ for path, (size, updated) in blobstorageio.BlobStorageIO() \
+ .list_prefix(dir_or_prefix, with_metadata=True).items():
+ yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
@@ -276,6 +276,25 @@ class BlobStorageFileSystem(FileSystem):
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path, e})
+ def metadata(self, path):
+ """Fetch metadata fields of a file on the FileSystem.
+
+ Args:
+ path: string path of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if path isn't a file or doesn't exist.
+ """
+ try:
+ file_metadata = blobstorageio.BlobStorageIO()._status(path)
+ return FileMetadata(
+ path, file_metadata['size'], file_metadata['last_updated'])
+ except Exception as e: # pylint: disable=broad-except
+ raise BeamIOError("Metadata operation failed", {path: e})
+
def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index c67dbe9c5c6..241cc72a15d 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -83,6 +83,25 @@ class BlobStorageFileSystemTest(unittest.TestCase):
with self.assertRaises(ValueError):
self.fs.split('/no/azfs/prefix')
+ @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
+ def test_match_single(self, unused_mock_blobstorageio):
+ # Prepare mocks.
+ blobstorageio_mock = mock.MagicMock()
+ blobstoragefilesystem.blobstorageio.BlobStorageIO = \
+ lambda: blobstorageio_mock
+ blobstorageio_mock.exists.return_value = True
+ blobstorageio_mock._status.return_value = {
+ 'size': 1, 'last_updated': 99999.0
+ }
+ expected_results = [
+ FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)
+ ]
+ match_result = self.fs.match(['azfs://storageaccount/container/file1'])[0]
+
+ self.assertEqual(match_result.metadata_list, expected_results)
+ blobstorageio_mock._status.assert_called_once_with(
+ 'azfs://storageaccount/container/file1')
+
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
def test_match_multiples(self, unused_mock_blobstorageio):
# Prepare mocks.
@@ -90,18 +109,18 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda: blobstorageio_mock
blobstorageio_mock.list_prefix.return_value = {
- 'azfs://storageaccount/container/file1': 1,
- 'azfs://storageaccount/container/file2': 2,
+ 'azfs://storageaccount/container/file1': (1, 99999.0),
+ 'azfs://storageaccount/container/file2': (2, 88888.0)
}
expected_results = set([
- FileMetadata('azfs://storageaccount/container/file1', 1),
- FileMetadata('azfs://storageaccount/container/file2', 2),
+ FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0),
+ FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0),
])
match_result = self.fs.match(['azfs://storageaccount/container/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
blobstorageio_mock.list_prefix.assert_called_once_with(
- 'azfs://storageaccount/container/')
+ 'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
def test_match_multiples_limit(self, unused_mock_blobstorageio):
@@ -111,16 +130,16 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda: blobstorageio_mock
blobstorageio_mock.list_prefix.return_value = {
- 'azfs://storageaccount/container/file1': 1
+ 'azfs://storageaccount/container/file1': (1, 99999.0)
}
expected_results = set(
- [FileMetadata('azfs://storageaccount/container/file1', 1)])
+ [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)])
match_result = self.fs.match(['azfs://storageaccount/container/'],
[limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
blobstorageio_mock.list_prefix.assert_called_once_with(
- 'azfs://storageaccount/container/')
+ 'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
def test_match_multiples_error(self, unused_mock_blobstorageio):
@@ -139,7 +158,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
str(error.exception.exception_details),
r'azfs://storageaccount/container/.*%s' % exception)
blobstorageio_mock.list_prefix.assert_called_once_with(
- 'azfs://storageaccount/container/')
+ 'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
def test_match_multiple_patterns(self, unused_mock_blobstorageio):
@@ -149,15 +168,15 @@ class BlobStorageFileSystemTest(unittest.TestCase):
lambda: blobstorageio_mock
blobstorageio_mock.list_prefix.side_effect = [
{
- 'azfs://storageaccount/container/file1': 1
+ 'azfs://storageaccount/container/file1': (1, 99999.0)
},
{
- 'azfs://storageaccount/container/file2': 2
+ 'azfs://storageaccount/container/file2': (2, 88888.0)
},
]
expected_results = [
- [FileMetadata('azfs://storageaccount/container/file1', 1)],
- [FileMetadata('azfs://storageaccount/container/file2', 2)]
+ [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)],
+ [FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0)]
]
result = self.fs.match([
'azfs://storageaccount/container/file1*',
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index d27b81ef852..290b5228198 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -317,8 +317,6 @@ class BlobStorageIO(object):
return results
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_beam_io_error_filter)
def exists(self, path):
"""Returns whether the given Azure Blob Storage blob exists.
@@ -326,10 +324,8 @@ class BlobStorageIO(object):
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
- container, blob = parse_azfs_path(path)
- blob_to_check = self.client.get_blob_client(container, blob)
try:
- blob_to_check.get_blob_properties()
+ self._blob_properties(path)
return True
except ResourceNotFoundError as e:
if e.status_code == 404:
@@ -339,8 +335,6 @@ class BlobStorageIO(object):
# We re-raise all other exceptions.
raise
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_beam_io_error_filter)
def size(self, path):
"""Returns the size of a single Blob Storage blob.
@@ -349,19 +343,8 @@ class BlobStorageIO(object):
Returns: size of the Blob Storage blob in bytes.
"""
- container, blob = parse_azfs_path(path)
- blob_to_check = self.client.get_blob_client(container, blob)
- try:
- properties = blob_to_check.get_blob_properties()
- except ResourceNotFoundError as e:
- message = e.reason
- code = e.status_code
- raise BlobStorageError(message, code)
+ return self._blob_properties(path).size
- return properties.size
-
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_beam_io_error_filter)
def last_updated(self, path):
"""Returns the last updated epoch time of a single
Azure Blob Storage blob.
@@ -372,22 +355,8 @@ class BlobStorageIO(object):
Returns: last updated time of the Azure Blob Storage blob
in seconds.
"""
- container, blob = parse_azfs_path(path)
- blob_to_check = self.client.get_blob_client(container, blob)
- try:
- properties = blob_to_check.get_blob_properties()
- except ResourceNotFoundError as e:
- message = e.reason
- code = e.status_code
- raise BlobStorageError(message, code)
+ return self._updated_to_seconds(self._blob_properties(path).last_modified)
- datatime = properties.last_modified
- return (
- time.mktime(datatime.timetuple()) - time.timezone +
- datatime.microsecond / 1000000.0)
-
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_beam_io_error_filter)
def checksum(self, path):
"""Looks up the checksum of an Azure Blob Storage blob.
@@ -395,6 +364,40 @@ class BlobStorageIO(object):
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
+ return self._blob_properties(path).properties.etag
+
+ def _status(self, path):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns supported fields (checksum, last_updated, size) of a single object
+ as a dict at once.
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single blob property.
+
+ Returns: dict of fields of the blob property.
+ """
+ properties = self._blob_properties(path)
+ file_status = {}
+ if hasattr(properties, 'etag'):
+ file_status['checksum'] = properties.etag
+ if hasattr(properties, 'last_modified'):
+ file_status['last_updated'] = self._updated_to_seconds(
+ properties.last_modified)
+ if hasattr(properties, 'size'):
+ file_status['size'] = properties.size
+ return file_status
+
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_beam_io_error_filter)
+ def _blob_properties(self, path):
+ """Returns a blob properties object for the given path
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single blob properties object.
+
+ Returns: blob properties.
+ """
container, blob = parse_azfs_path(path)
blob_to_check = self.client.get_blob_client(container, blob)
try:
@@ -404,7 +407,14 @@ class BlobStorageIO(object):
code = e.status_code
raise BlobStorageError(message, code)
- return properties.etag
+ return properties
+
+ @staticmethod
+ def _updated_to_seconds(updated):
+ """Helper function transform the updated field of response to seconds"""
+ return (
+ time.mktime(updated.timetuple()) - time.timezone +
+ updated.microsecond / 1000000.0)
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
@@ -559,40 +569,54 @@ class BlobStorageIO(object):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
- def list_prefix(self, path):
+ def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
Returns:
- Dictionary of file name -> size.
+ If ``with_metadata`` is False: dict of file name -> size; if
+ ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
storage_account, container, blob = parse_azfs_path(
path, blob_optional=True, get_account=True)
- file_sizes = {}
+ file_info = {}
counter = 0
start_time = time.time()
- logging.info("Starting the size estimation of the input")
+ if with_metadata:
+ logging.info("Starting the file information of the input")
+ else:
+ logging.info("Starting the size estimation of the input")
container_client = self.client.get_container_client(container)
while True:
response = container_client.list_blobs(name_starts_with=blob)
for item in response:
file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
- file_sizes[file_name] = item.size
+ if with_metadata:
+ file_info[file_name] = (
+ item.size, self._updated_to_seconds(item.last_modified))
+ else:
+ file_info[file_name] = item.size
counter += 1
if counter % 10000 == 0:
- logging.info("Finished computing size of: %s files", len(file_sizes))
+ if with_metadata:
+ logging.info(
+ "Finished computing file information of: %s files",
+ len(file_info))
+ else:
+ logging.info("Finished computing size of: %s files", len(file_info))
break
logging.info(
"Finished listing %s files in %s seconds.",
counter,
time.time() - start_time)
- return file_sizes
+ return file_info
class BlobStorageDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index c854e587972..a833eef282f 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -421,27 +421,44 @@ class CompressedFile(object):
class FileMetadata(object):
- """Metadata about a file path that is the output of FileSystem.match."""
- def __init__(self, path, size_in_bytes):
+ """Metadata about a file path that is the output of FileSystem.match.
+
+ Fields:
+ path: [Required] file path.
+ size_in_bytes: [Required] file size in bytes.
+ last_updated_in_seconds: [Optional] last modified timestamp of the file, or
+ valued 0.0 if not specified.
+ """
+ def __init__(
+ self,
+ path: str,
+ size_in_bytes: int,
+ last_updated_in_seconds: float = 0.0):
assert isinstance(path, str) and path, "Path should be a string"
assert isinstance(size_in_bytes, int) and size_in_bytes >= 0, \
"Invalid value for size_in_bytes should %s (of type %s)" % (
size_in_bytes, type(size_in_bytes))
self.path = path
self.size_in_bytes = size_in_bytes
+ self.last_updated_in_seconds = last_updated_in_seconds
def __eq__(self, other):
"""Note: This is only used in tests where we verify that mock objects match.
"""
return (
isinstance(other, FileMetadata) and self.path == other.path and
- self.size_in_bytes == other.size_in_bytes)
+ self.size_in_bytes == other.size_in_bytes and
+ self.last_updated_in_seconds == other.last_updated_in_seconds)
def __hash__(self):
- return hash((self.path, self.size_in_bytes))
+ return hash((self.path, self.size_in_bytes, self.last_updated_in_seconds))
def __repr__(self):
- return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+ if self.last_updated_in_seconds == 0.0:
+ return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+ else:
+ return 'FileMetadata(%s, %s, %s)' % (
+ self.path, self.size_in_bytes, self.last_updated_in_seconds)
class MatchResult(object):
@@ -717,7 +734,7 @@ class FileSystem(BeamPlugin, metaclass=abc.ABCMeta):
if prefix_or_dir == pattern:
# Short-circuit calling self.list() if there's no glob pattern to match.
if self.exists(pattern):
- file_metadatas = [FileMetadata(pattern, self.size(pattern))]
+ file_metadatas = [self.metadata(pattern)]
else:
if self.has_dirs():
prefix_dirname = self._url_dirname(prefix_or_dir)
@@ -878,6 +895,27 @@ class FileSystem(BeamPlugin, metaclass=abc.ABCMeta):
"""
raise NotImplementedError
+ @abc.abstractmethod
+ def metadata(self, path):
+ """Fetch metadata of a file on the
+ :class:`~apache_beam.io.filesystem.FileSystem`.
+
+ This operation returns metadata as stored in the underlying
+ FileSystem. It should not need to read file data to obtain this value.
+ For web based file systems, this method should also incur as few as
+ possible requests.
+
+ Args:
+ path: string path of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if path isn't a file or doesn't exist.
+ """
+ raise NotImplementedError
+
@abc.abstractmethod
def delete(self, paths):
"""Deletes files or directories at the provided paths.
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
index a2463352de3..4f57fd52bc8 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -103,6 +103,9 @@ class TestingFileSystem(FileSystem):
def checksum(self, path):
raise NotImplementedError
+ def metadata(self, path):
+ raise NotImplementedError
+
def delete(self, paths):
raise NotImplementedError
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index e53ceef70c9..6e4b1b6c7f0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -120,8 +120,9 @@ class GCSFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, size in gcsio.GcsIO().list_prefix(dir_or_prefix).items():
- yield FileMetadata(path, size)
+ for path, (size, updated) in gcsio.GcsIO().list_prefix(
+ dir_or_prefix, with_metadata=True).items():
+ yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
@@ -311,6 +312,25 @@ class GCSFileSystem(FileSystem):
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})
+ def metadata(self, path):
+ """Fetch metadata fields of a file on the FileSystem.
+
+ Args:
+ path: string path of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if path isn't a file or doesn't exist.
+ """
+ try:
+ file_metadata = gcsio.GcsIO()._status(path)
+ return FileMetadata(
+ path, file_metadata['size'], file_metadata['last_updated'])
+ except Exception as e: # pylint: disable=broad-except
+ raise BeamIOError("Metadata operation failed", {path: e})
+
def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index c5f80bb6a1d..b4d921ada23 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -83,15 +83,16 @@ class GCSFileSystemTest(unittest.TestCase):
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
gcsio_mock.list_prefix.return_value = {
- 'gs://bucket/file1': 1, 'gs://bucket/file2': 2
+ 'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
}
expected_results = set([
- FileMetadata('gs://bucket/file1', 1),
- FileMetadata('gs://bucket/file2', 2)
+ FileMetadata('gs://bucket/file1', 1, 99999.0),
+ FileMetadata('gs://bucket/file2', 2, 88888.0)
])
match_result = self.fs.match(['gs://bucket/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
- gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+ gcsio_mock.list_prefix.assert_called_once_with(
+ 'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples_limit(self, mock_gcsio):
@@ -99,12 +100,13 @@ class GCSFileSystemTest(unittest.TestCase):
gcsio_mock = mock.MagicMock()
limit = 1
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
- gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': 1}
- expected_results = set([FileMetadata('gs://bucket/file1', 1)])
+ gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
+ expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['gs://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
- gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+ gcsio_mock.list_prefix.assert_called_once_with(
+ 'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples_error(self, mock_gcsio):
@@ -119,7 +121,8 @@ class GCSFileSystemTest(unittest.TestCase):
self.fs.match(['gs://bucket/'])
self.assertRegex(
str(error.exception.exception_details), r'gs://bucket/.*%s' % exception)
- gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+ gcsio_mock.list_prefix.assert_called_once_with(
+ 'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiple_patterns(self, mock_gcsio):
@@ -128,14 +131,14 @@ class GCSFileSystemTest(unittest.TestCase):
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
gcsio_mock.list_prefix.side_effect = [
{
- 'gs://bucket/file1': 1
+ 'gs://bucket/file1': (1, 99999.0)
},
{
- 'gs://bucket/file2': 2
+ 'gs://bucket/file2': (2, 88888.0)
},
]
- expected_results = [[FileMetadata('gs://bucket/file1', 1)],
- [FileMetadata('gs://bucket/file2', 2)]]
+ expected_results = [[FileMetadata('gs://bucket/file1', 1, 99999.0)],
+ [FileMetadata('gs://bucket/file2', 2, 88888.0)]]
result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
self.assertEqual([mr.metadata_list for mr in result], expected_results)
@@ -306,7 +309,7 @@ class GCSFileSystemTest(unittest.TestCase):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
- gcsio_mock.size.return_value = 0
+ gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
files = [
'gs://bucket/from1',
'gs://bucket/from2',
@@ -324,7 +327,7 @@ class GCSFileSystemTest(unittest.TestCase):
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
exception = IOError('Failed')
gcsio_mock.delete_batch.side_effect = exception
- gcsio_mock.size.return_value = 0
+ gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
files = [
'gs://bucket/from1',
'gs://bucket/from2',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 86046e340d2..87654bc8174 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -460,19 +460,14 @@ class GcsIO(object):
self.copy(src, dest)
self.delete(src)
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def exists(self, path):
"""Returns whether the given GCS object exists.
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
"""
- bucket, object_path = parse_gcs_path(path)
try:
- request = storage.StorageObjectsGetRequest(
- bucket=bucket, object=object_path)
- self.client.objects.Get(request) # metadata
+ self._gcs_object(path) # gcs object
return True
except HttpError as http_error:
if http_error.status_code == 404:
@@ -482,21 +477,14 @@ class GcsIO(object):
# We re-raise all other exceptions
raise
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def checksum(self, path):
"""Looks up the checksum of a GCS object.
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
"""
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsGetRequest(
- bucket=bucket, object=object_path)
- return self.client.objects.Get(request).crc32c
+ return self._gcs_object(path).crc32c
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def size(self, path):
"""Returns the size of a single GCS object.
@@ -505,13 +493,8 @@ class GcsIO(object):
Returns: size of the GCS object in bytes.
"""
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsGetRequest(
- bucket=bucket, object=object_path)
- return self.client.objects.Get(request).size
+ return self._gcs_object(path).size
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def kms_key(self, path):
"""Returns the KMS key of a single GCS object.
@@ -521,13 +504,8 @@ class GcsIO(object):
Returns: KMS key name of the GCS object as a string, or None if it doesn't
have one.
"""
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsGetRequest(
- bucket=bucket, object=object_path)
- return self.client.objects.Get(request).kmsKeyName
+ return self._gcs_object(path).kmsKeyName
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def last_updated(self, path):
"""Returns the last updated epoch time of a single GCS object.
@@ -536,39 +514,85 @@ class GcsIO(object):
Returns: last updated time of the GCS object in second.
"""
+ return self._updated_to_seconds(self._gcs_object(path).updated)
+
+ def _status(self, path):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns supported fields (checksum, kms_key, last_updated, size) of a
+ single object as a dict at once.
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single GCS object.
+
+ Returns: dict of fields of the GCS object.
+ """
+ gcs_object = self._gcs_object(path)
+ file_status = {}
+ if hasattr(gcs_object, 'crc32c'):
+ file_status['checksum'] = gcs_object.crc32c
+ if hasattr(gcs_object, 'kmsKeyName'):
+ file_status['kms_key'] = gcs_object.kmsKeyName
+ if hasattr(gcs_object, 'updated'):
+ file_status['last_updated'] = self._updated_to_seconds(gcs_object.updated)
+ if hasattr(gcs_object, 'size'):
+ file_status['size'] = gcs_object.size
+ return file_status
+
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ def _gcs_object(self, path):
+ """Returns a gcs object for the given path
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single GCS object.
+
+ Returns: GCS object.
+ """
bucket, object_path = parse_gcs_path(path)
request = storage.StorageObjectsGetRequest(
bucket=bucket, object=object_path)
- datetime = self.client.objects.Get(request).updated
- return (
- time.mktime(datetime.timetuple()) - time.timezone +
- datetime.microsecond / 1000000.0)
+ return self.client.objects.Get(request)
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def list_prefix(self, path):
+ def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: GCS file path pattern in the form gs://<bucket>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
Returns:
- Dictionary of file name -> size.
+ If ``with_metadata`` is False: dict of file name -> size; if
+ ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
bucket, prefix = parse_gcs_path(path, object_optional=True)
request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
- file_sizes = {}
+ file_info = {}
counter = 0
start_time = time.time()
- _LOGGER.info("Starting the size estimation of the input")
+ if with_metadata:
+ _LOGGER.info("Starting the file information of the input")
+ else:
+ _LOGGER.info("Starting the size estimation of the input")
while True:
response = self.client.objects.List(request)
for item in response.items:
file_name = 'gs://%s/%s' % (item.bucket, item.name)
- file_sizes[file_name] = item.size
+ if with_metadata:
+ file_info[file_name] = (
+ item.size, self._updated_to_seconds(item.updated))
+ else:
+ file_info[file_name] = item.size
counter += 1
if counter % 10000 == 0:
- _LOGGER.info("Finished computing size of: %s files", len(file_sizes))
+ if with_metadata:
+ _LOGGER.info(
+ "Finished computing file information of: %s files",
+ len(file_info))
+ else:
+ _LOGGER.info("Finished computing size of: %s files", len(file_info))
if response.nextPageToken:
request.pageToken = response.nextPageToken
else:
@@ -577,7 +601,14 @@ class GcsIO(object):
"Finished listing %s files in %s seconds.",
counter,
time.time() - start_time)
- return file_sizes
+ return file_info
+
+ @staticmethod
+ def _updated_to_seconds(updated):
+ """Helper function transform the updated field of response to seconds"""
+ return (
+ time.mktime(updated.timetuple()) - time.timezone +
+ updated.microsecond / 1000000.0)
class GcsDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 73a7984f320..a4aa2d4aa85 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -405,6 +405,26 @@ class TestGCSIO(unittest.TestCase):
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(last_updated, self.gcs.last_updated(file_name))
+ def test_file_status(self):
+ file_name = 'gs://gcsio-test/dummy_file'
+ file_size = 1234
+ last_updated = 123456.78
+ checksum = 'deadbeef'
+
+ self._insert_random_file(
+ self.client,
+ file_name,
+ file_size,
+ last_updated=last_updated,
+ crc32c=checksum)
+ file_checksum = self.gcs.checksum(file_name)
+
+ file_status = self.gcs._status(file_name)
+
+ self.assertEqual(file_status['size'], file_size)
+ self.assertEqual(file_status['checksum'], file_checksum)
+ self.assertEqual(file_status['last_updated'], last_updated)
+
def test_file_mode(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
with self.gcs.open(file_name, 'wb') as f:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 046908d930e..825e9ca8326 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -51,6 +51,7 @@ _FILE_CHECKSUM_BYTES = 'bytes'
_FILE_CHECKSUM_LENGTH = 'length'
# WebHDFS FileStatus property constants.
_FILE_STATUS_LENGTH = 'length'
+_FILE_STATUS_UPDATED = 'modificationTime'
_FILE_STATUS_PATH_SUFFIX = 'pathSuffix'
_FILE_STATUS_TYPE = 'type'
_FILE_STATUS_TYPE_DIRECTORY = 'DIRECTORY'
@@ -212,7 +213,8 @@ class HadoopFileSystem(FileSystem):
for res in self._hdfs_client.list(path, status=True):
yield FileMetadata(
_HDFS_PREFIX + self._join(server, path, res[0]),
- res[1][_FILE_STATUS_LENGTH])
+ res[1][_FILE_STATUS_LENGTH],
+ res[1][_FILE_STATUS_UPDATED] / 1000.0)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError('List operation failed', {url: e})
@@ -376,20 +378,37 @@ class HadoopFileSystem(FileSystem):
return self._hdfs_client.status(path, strict=False) is not None
def size(self, url):
- _, path = self._parse_url(url)
- status = self._hdfs_client.status(path, strict=False)
- if status is None:
- raise BeamIOError('File not found: %s' % url)
- return status[_FILE_STATUS_LENGTH]
+ """Fetches file size for a URL.
+
+ Returns:
+ int size of path according to the FileSystem.
+
+ Raises:
+ ``BeamIOError``: if url doesn't exist.
+ """
+ return self.metadata(url).size_in_bytes
def last_updated(self, url):
- raise NotImplementedError
+ """Fetches last updated time for a URL.
+
+ Args:
+ url: string url of file.
+
+ Returns: float UNIX Epoch time
+
+ Raises:
+ ``BeamIOError``: if path doesn't exist.
+ """
+ return self.metadata(url).last_updated_in_seconds
def checksum(self, url):
"""Fetches a checksum description for a URL.
Returns:
String describing the checksum.
+
+ Raises:
+ ``BeamIOError``: if url doesn't exist.
"""
_, path = self._parse_url(url)
file_checksum = self._hdfs_client.checksum(path)
@@ -399,6 +418,25 @@ class HadoopFileSystem(FileSystem):
file_checksum[_FILE_CHECKSUM_BYTES],
)
+ def metadata(self, url):
+ """Fetch metadata fields of a file on the FileSystem.
+
+ Args:
+ url: string url of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if url doesn't exist.
+ """
+ _, path = self._parse_url(url)
+ status = self._hdfs_client.status(path, strict=False)
+ if status is None:
+ raise BeamIOError('File not found: %s' % url)
+ return FileMetadata(
+ url, status[_FILE_STATUS_LENGTH], status[_FILE_STATUS_UPDATED] / 1000.0)
+
def delete(self, urls):
exceptions = {}
for url in urls:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index ed56927cec0..2486b1c8717 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -22,6 +22,7 @@
import io
import logging
import posixpath
+import time
import unittest
from parameterized import parameterized_class
@@ -36,17 +37,16 @@ class FakeFile(io.BytesIO):
"""File object for FakeHdfs"""
__hash__ = None # type: ignore[assignment]
- def __init__(self, path, mode='', type='FILE'):
+ def __init__(self, path, mode='', type='FILE', time_ms=None):
io.BytesIO.__init__(self)
-
- self.stat = {
- 'path': path,
- 'mode': mode,
- 'type': type,
- }
+ if time_ms is None:
+ time_ms = int(time.time() * 1000)
+ self.time_ms = time_ms
+ self.stat = {'path': path, 'mode': mode, 'type': type}
self.saved_data = None
def __eq__(self, other):
+ """Equality of two files. Timestamp not included in comparison"""
return self.stat == other.stat and self.getvalue() == self.getvalue()
def close(self):
@@ -73,6 +73,7 @@ class FakeFile(io.BytesIO):
hdfs._FILE_STATUS_PATH_SUFFIX: posixpath.basename(self.stat['path']),
hdfs._FILE_STATUS_LENGTH: self.size,
hdfs._FILE_STATUS_TYPE: self.stat['type'],
+ hdfs._FILE_STATUS_UPDATED: self.time_ms
}
def get_file_checksum(self):
@@ -538,6 +539,16 @@ class HadoopFileSystemTest(unittest.TestCase):
self.assertEqual(
'fake_algo-5-checksum_byte_sequence', self.fs.checksum(url))
+ def test_last_updated(self):
+ url = self.fs.join(self.tmpdir, 'f1')
+ with self.fs.create(url) as f:
+ f.write(b'Hello')
+ # The time difference should be tiny for the mock hdfs.
+ # A loose tolerance is for the consideration of real web hdfs.
+ tolerance = 5 * 60 # 5 mins
+ result = self.fs.last_updated(url)
+ self.assertAlmostEqual(result, time.time(), delta=tolerance)
+
def test_delete_file(self):
url = self.fs.join(self.tmpdir, 'old_file1')
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 56668aa8952..3580b79ea56 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -121,7 +121,7 @@ class LocalFileSystem(FileSystem):
try:
for f in list_files(dir_or_prefix):
try:
- yield FileMetadata(f, os.path.getsize(f))
+ yield FileMetadata(f, os.path.getsize(f), os.path.getmtime(f))
except OSError:
# Files may disappear, such as when listing /tmp.
pass
@@ -311,6 +311,22 @@ class LocalFileSystem(FileSystem):
raise BeamIOError('Path does not exist: %s' % path)
return str(os.path.getsize(path))
+ def metadata(self, path):
+ """Fetch metadata fields of a file on the FileSystem.
+
+ Args:
+ path: string path of a file.
+
+ Returns:
+ :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+ Raises:
+ ``BeamIOError``: if path isn't a file or doesn't exist.
+ """
+ if not self.exists(path):
+ raise BeamIOError('Path does not exist: %s' % path)
+ return FileMetadata(path, os.path.getsize(path), os.path.getmtime(path))
+
def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 84455c28f4c..1370790970e 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -293,8 +293,14 @@ class LocalFileSystemTest(unittest.TestCase):
f.write('Hello')
with open(path2, 'a') as f:
f.write('foo')
- self.assertEqual(self.fs.checksum(path1), str(5))
- self.assertEqual(self.fs.checksum(path2), str(3))
+ # tests that localfilesystem checksum returns file size
+ checksum1 = self.fs.checksum(path1)
+ checksum2 = self.fs.checksum(path2)
+ self.assertEqual(checksum1, str(5))
+ self.assertEqual(checksum2, str(3))
+ # tests that fs.checksum and str(fs.size) are consistent
+ self.assertEqual(checksum1, str(self.fs.size(path1)))
+ self.assertEqual(checksum2, str(self.fs.size(path2)))
def make_tree(self, path, value, expected_leaf_count=None):
"""Create a file+directory structure from a simple dict-based DSL