You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/01 22:43:22 UTC
[beam] branch master updated: Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 03052670499 Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)
03052670499 is described below
commit 03052670499ad953056dd55914d58c0c49961171
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Dec 1 17:43:15 2022 -0500
Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)
* Reuse concat_source in FileBasedSource.estimate_size
* Create list_files to replace list_prefix to conduct lazy listing
files in GcsIO, S3IO, BlobStorageIO
---
sdks/python/apache_beam/io/aws/s3filesystem.py | 4 +-
.../python/apache_beam/io/aws/s3filesystem_test.py | 30 +++++------
sdks/python/apache_beam/io/aws/s3io.py | 58 +++++++++++++++------
.../apache_beam/io/azure/blobstoragefilesystem.py | 4 +-
.../io/azure/blobstoragefilesystem_test.py | 32 +++++-------
sdks/python/apache_beam/io/azure/blobstorageio.py | 49 +++++++++++++-----
sdks/python/apache_beam/io/filebasedsource.py | 5 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 4 +-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 29 +++++------
sdks/python/apache_beam/io/gcp/gcsio.py | 60 ++++++++++++++++------
10 files changed, 171 insertions(+), 104 deletions(-)
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py
index 68d8ea7e313..636b0a12f3e 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem.py
@@ -128,8 +128,8 @@ class S3FileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, (size, updated) in s3io.S3IO(options=self._options) \
- .list_prefix(dir_or_prefix, with_metadata=True).items():
+ for path, (size, updated) in s3io.S3IO(options=self._options).list_files(
+ dir_or_prefix, with_metadata=True):
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
index dd7209f0cbb..60e6f319b2c 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
@@ -95,10 +95,10 @@ class S3FileSystemTest(unittest.TestCase):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
- s3io_mock.list_prefix.return_value = {
- 's3://bucket/file1': (1, 9999999.0),
- 's3://bucket/file2': (2, 8888888.0)
- }
+ s3io_mock.list_files.return_value = iter([
+ ('s3://bucket/file1', (1, 9999999.0)),
+ ('s3://bucket/file2', (2, 8888888.0))
+ ])
expected_results = set([
FileMetadata('s3://bucket/file1', 1, 9999999.0),
FileMetadata('s3://bucket/file2', 2, 8888888.0)
@@ -106,7 +106,7 @@ class S3FileSystemTest(unittest.TestCase):
match_result = self.fs.match(['s3://bucket/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
- s3io_mock.list_prefix.assert_called_once_with(
+ s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -115,12 +115,14 @@ class S3FileSystemTest(unittest.TestCase):
s3io_mock = mock.MagicMock()
limit = 1
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
- s3io_mock.list_prefix.return_value = {'s3://bucket/file1': (1, 99999.0)}
+ s3io_mock.list_files.return_value = iter([
+ ('s3://bucket/file1', (1, 99999.0))
+ ])
expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['s3://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
- s3io_mock.list_prefix.assert_called_once_with(
+ s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -129,13 +131,13 @@ class S3FileSystemTest(unittest.TestCase):
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
exception = IOError('Failed')
- s3io_mock.list_prefix.side_effect = exception
+ s3io_mock.list_files.side_effect = exception
with self.assertRaises(BeamIOError) as error:
self.fs.match(['s3://bucket/'])
self.assertIn('Match operation failed', str(error.exception))
- s3io_mock.list_prefix.assert_called_once_with(
+ s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -143,13 +145,9 @@ class S3FileSystemTest(unittest.TestCase):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
- s3io_mock.list_prefix.side_effect = [
- {
- 's3://bucket/file1': (1, 99999.0)
- },
- {
- 's3://bucket/file2': (2, 88888.0)
- },
+ s3io_mock.list_files.side_effect = [
+ iter([('s3://bucket/file1', (1, 99999.0))]),
+ iter([('s3://bucket/file2', (2, 88888.0))]),
]
expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
[FileMetadata('s3://bucket/file2', 2, 88888.0)]]
diff --git a/sdks/python/apache_beam/io/aws/s3io.py b/sdks/python/apache_beam/io/aws/s3io.py
index 66bdaa52e90..887bb4c7baa 100644
--- a/sdks/python/apache_beam/io/aws/s3io.py
+++ b/sdks/python/apache_beam/io/aws/s3io.py
@@ -33,6 +33,7 @@ from apache_beam.io.filesystemio import DownloaderStream
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
try:
# pylint: disable=wrong-import-order, wrong-import-position
@@ -99,11 +100,13 @@ class S3IO(object):
else:
raise ValueError('Invalid file open mode: %s.' % mode)
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ @deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
+ ``list_prefix`` has been deprecated. Use `list_files` instead, which returns
+ a generator of file information instead of a dict.
+
Args:
path: S3 file path pattern in the form s3://<bucket>/[name].
with_metadata: Experimental. Specify whether returns file metadata.
@@ -112,10 +115,28 @@ class S3IO(object):
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
+ file_info = {}
+ for file_metadata in self.list_files(path, with_metadata):
+ file_info[file_metadata[0]] = file_metadata[1]
+
+ return file_info
+
+ def list_files(self, path, with_metadata=False):
+ """Lists files matching the prefix.
+
+ Args:
+ path: S3 file path pattern in the form s3://<bucket>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
+
+ Returns:
+ If ``with_metadata`` is False: generator of tuple(file name, size); if
+ ``with_metadata`` is True: generator of
+ tuple(file name, tuple(size, timestamp)).
+ """
bucket, prefix = parse_s3_path(path, object_optional=True)
request = messages.ListRequest(bucket=bucket, prefix=prefix)
- file_info = {}
+ file_info = set()
counter = 0
start_time = time.time()
@@ -130,7 +151,10 @@ class S3IO(object):
#This should not be an issue here.
#Ignore this exception or it will break the procedure.
try:
- response = self.client.list(request)
+ response = retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+ self.client.list)(
+ request)
except messages.S3ClientError as e:
if e.code == 404:
break
@@ -139,19 +163,23 @@ class S3IO(object):
for item in response.items:
file_name = 's3://%s/%s' % (bucket, item.key)
- if with_metadata:
- file_info[file_name] = (
- item.size, self._updated_to_seconds(item.last_modified))
- else:
- file_info[file_name] = item.size
- counter += 1
- if counter % 10000 == 0:
+ if file_name not in file_info:
+ file_info.add(file_name)
+ counter += 1
+ if counter % 10000 == 0:
+ if with_metadata:
+ logging.info(
+ "Finished computing file information of: %s files",
+ len(file_info))
+ else:
+ logging.info(
+ "Finished computing size of: %s files", len(file_info))
if with_metadata:
- logging.info(
- "Finished computing file information of: %s files",
- len(file_info))
+ yield file_name, (
+ item.size, self._updated_to_seconds(item.last_modified))
else:
- logging.info("Finished computing size of: %s files", len(file_info))
+ yield file_name, item.size
+
if response.next_token:
request.continuation_token = response.next_token
else:
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index cc678f9ae0f..8bc3dd68281 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -122,8 +122,8 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, (size, updated) in self._blobstorageIO() \
- .list_prefix(dir_or_prefix, with_metadata=True).items():
+ for path, (size, updated) in self._blobstorageIO().list_files(
+ dir_or_prefix, with_metadata=True):
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index 6c844329867..cee459f5b8a 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -108,10 +108,10 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda pipeline_options: blobstorageio_mock
- blobstorageio_mock.list_prefix.return_value = {
- 'azfs://storageaccount/container/file1': (1, 99999.0),
- 'azfs://storageaccount/container/file2': (2, 88888.0)
- }
+ blobstorageio_mock.list_files.return_value = iter([
+ ('azfs://storageaccount/container/file1', (1, 99999.0)),
+ ('azfs://storageaccount/container/file2', (2, 88888.0))
+ ])
expected_results = set([
FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0),
FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0),
@@ -119,7 +119,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
match_result = self.fs.match(['azfs://storageaccount/container/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
- blobstorageio_mock.list_prefix.assert_called_once_with(
+ blobstorageio_mock.list_files.assert_called_once_with(
'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -129,16 +129,16 @@ class BlobStorageFileSystemTest(unittest.TestCase):
limit = 1
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda pipeline_options: blobstorageio_mock
- blobstorageio_mock.list_prefix.return_value = {
- 'azfs://storageaccount/container/file1': (1, 99999.0)
- }
+ blobstorageio_mock.list_files.return_value = iter([
+ ('azfs://storageaccount/container/file1', (1, 99999.0))
+ ])
expected_results = set(
[FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)])
match_result = self.fs.match(['azfs://storageaccount/container/'],
[limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
- blobstorageio_mock.list_prefix.assert_called_once_with(
+ blobstorageio_mock.list_files.assert_called_once_with(
'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -148,7 +148,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda pipeline_options: blobstorageio_mock
exception = IOError('Failed')
- blobstorageio_mock.list_prefix.side_effect = exception
+ blobstorageio_mock.list_files.side_effect = exception
with self.assertRaisesRegex(BeamIOError,
r'^Match operation failed') as error:
@@ -157,7 +157,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
self.assertRegex(
str(error.exception.exception_details),
r'azfs://storageaccount/container/.*%s' % exception)
- blobstorageio_mock.list_prefix.assert_called_once_with(
+ blobstorageio_mock.list_files.assert_called_once_with(
'azfs://storageaccount/container/', with_metadata=True)
@mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -166,13 +166,9 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
lambda pipeline_options: blobstorageio_mock
- blobstorageio_mock.list_prefix.side_effect = [
- {
- 'azfs://storageaccount/container/file1': (1, 99999.0)
- },
- {
- 'azfs://storageaccount/container/file2': (2, 88888.0)
- },
+ blobstorageio_mock.list_files.side_effect = [
+ iter([('azfs://storageaccount/container/file1', (1, 99999.0))]),
+ iter([('azfs://storageaccount/container/file2', (2, 88888.0))]),
]
expected_results = [
[FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)],
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index 948dda32b8f..cfa4fe7d291 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -35,6 +35,7 @@ from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.options.pipeline_options import AzureOptions
from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
_LOGGER = logging.getLogger(__name__)
@@ -578,8 +579,7 @@ class BlobStorageIO(object):
return results
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_beam_io_error_filter)
+ @deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
@@ -592,9 +592,28 @@ class BlobStorageIO(object):
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
+ file_info = {}
+ for file_metadata in self.list_files(path, with_metadata):
+ file_info[file_metadata[0]] = file_metadata[1]
+
+ return file_info
+
+ def list_files(self, path, with_metadata=False):
+ """Lists files matching the prefix.
+
+ Args:
+ path: Azure Blob Storage file path pattern in the form
+ azfs://<storage-account>/<container>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
+
+ Returns:
+ If ``with_metadata`` is False: generator of tuple(file name, size); if
+ ``with_metadata`` is True: generator of
+ tuple(file name, tuple(size, timestamp)).
+ """
storage_account, container, blob = parse_azfs_path(
path, blob_optional=True, get_account=True)
- file_info = {}
+ file_info = set()
counter = 0
start_time = time.time()
@@ -604,15 +623,14 @@ class BlobStorageIO(object):
logging.debug("Starting the size estimation of the input")
container_client = self.client.get_container_client(container)
- while True:
- response = container_client.list_blobs(name_starts_with=blob)
- for item in response:
- file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
- if with_metadata:
- file_info[file_name] = (
- item.size, self._updated_to_seconds(item.last_modified))
- else:
- file_info[file_name] = item.size
+ response = retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_beam_io_error_filter)(
+ container_client.list_blobs)(
+ name_starts_with=blob)
+ for item in response:
+ file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+ if file_name not in file_info:
+ file_info.add(file_name)
counter += 1
if counter % 10000 == 0:
if with_metadata:
@@ -621,7 +639,11 @@ class BlobStorageIO(object):
len(file_info))
else:
logging.info("Finished computing size of: %s files", len(file_info))
- break
+ if with_metadata:
+ yield file_name, (
+ item.size, self._updated_to_seconds(item.last_modified))
+ else:
+ yield file_name, item.size
logging.log(
# do not spam logs when list_prefix is likely used to check empty folder
@@ -629,7 +651,6 @@ class BlobStorageIO(object):
"Finished listing %s files in %s seconds.",
counter,
time.time() - start_time)
- return file_info
class BlobStorageDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index e2cd26868e5..240fc65c52b 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -196,11 +196,8 @@ class FileBasedSource(iobase.BoundedSource):
start_position=start_position,
stop_position=stop_position)
- @check_accessible(['_pattern'])
def estimate_size(self):
- pattern = self._pattern.get()
- match_result = FileSystems.match([pattern])[0]
- return sum([f.size_in_bytes for f in match_result.metadata_list])
+ return self._get_concat_source().estimate_size()
def read(self, range_tracker):
return self._get_concat_source().read(range_tracker)
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 11184cd34fd..c87a8499c91 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -131,8 +131,8 @@ class GCSFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, (size, updated) in self._gcsIO().list_prefix(
- dir_or_prefix, with_metadata=True).items():
+ for path, (size, updated) in self._gcsIO().list_files(dir_or_prefix,
+ with_metadata=True):
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 49b0bdc9f6c..800bd5d1c46 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -82,16 +82,17 @@ class GCSFileSystemTest(unittest.TestCase):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
- gcsio_mock.list_prefix.return_value = {
- 'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
- }
+ gcsio_mock.list_files.return_value = iter([
+ ('gs://bucket/file1', (1, 99999.0)),
+ ('gs://bucket/file2', (2, 88888.0))
+ ])
expected_results = set([
FileMetadata('gs://bucket/file1', 1, 99999.0),
FileMetadata('gs://bucket/file2', 2, 88888.0)
])
match_result = self.fs.match(['gs://bucket/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
- gcsio_mock.list_prefix.assert_called_once_with(
+ gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -100,12 +101,14 @@ class GCSFileSystemTest(unittest.TestCase):
gcsio_mock = mock.MagicMock()
limit = 1
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
- gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
+ gcsio_mock.list_files.return_value = iter([
+ ('gs://bucket/file1', (1, 99999.0))
+ ])
expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['gs://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
- gcsio_mock.list_prefix.assert_called_once_with(
+ gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -114,14 +117,14 @@ class GCSFileSystemTest(unittest.TestCase):
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
exception = IOError('Failed')
- gcsio_mock.list_prefix.side_effect = exception
+ gcsio_mock.list_files.side_effect = exception
with self.assertRaisesRegex(BeamIOError,
r'^Match operation failed') as error:
self.fs.match(['gs://bucket/'])
self.assertRegex(
str(error.exception.exception_details), r'gs://bucket/.*%s' % exception)
- gcsio_mock.list_prefix.assert_called_once_with(
+ gcsio_mock.list_files.assert_called_once_with(
'gs://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -129,13 +132,9 @@ class GCSFileSystemTest(unittest.TestCase):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
- gcsio_mock.list_prefix.side_effect = [
- {
- 'gs://bucket/file1': (1, 99999.0)
- },
- {
- 'gs://bucket/file2': (2, 88888.0)
- },
+ gcsio_mock.list_files.side_effect = [
+ iter([('gs://bucket/file1', (1, 99999.0))]),
+ iter([('gs://bucket/file2', (2, 88888.0))]),
]
expected_results = [[FileMetadata('gs://bucket/file1', 1, 99999.0)],
[FileMetadata('gs://bucket/file2', 2, 88888.0)]]
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index e34a0b77453..5aee4f5aaf3 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -53,6 +53,7 @@ from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
__all__ = ['GcsIO']
@@ -569,11 +570,13 @@ class GcsIO(object):
bucket=bucket, object=object_path)
return self.client.objects.Get(request)
- @retry.with_exponential_backoff(
- retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ @deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
+ ``list_prefix`` has been deprecated. Use `list_files` instead, which returns
+ a generator of file information instead of a dict.
+
Args:
path: GCS file path pattern in the form gs://<bucket>/[name].
with_metadata: Experimental. Specify whether returns file metadata.
@@ -582,9 +585,27 @@ class GcsIO(object):
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
+ file_info = {}
+ for file_metadata in self.list_files(path, with_metadata):
+ file_info[file_metadata[0]] = file_metadata[1]
+
+ return file_info
+
+ def list_files(self, path, with_metadata=False):
+ """Lists files matching the prefix.
+
+ Args:
+ path: GCS file path pattern in the form gs://<bucket>/[name].
+ with_metadata: Experimental. Specify whether returns file metadata.
+
+ Returns:
+ If ``with_metadata`` is False: generator of tuple(file name, size); if
+ ``with_metadata`` is True: generator of
+ tuple(file name, tuple(size, timestamp)).
+ """
bucket, prefix = parse_gcs_path(path, object_optional=True)
request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
- file_info = {}
+ file_info = set()
counter = 0
start_time = time.time()
if with_metadata:
@@ -592,22 +613,30 @@ class GcsIO(object):
else:
_LOGGER.debug("Starting the size estimation of the input")
while True:
- response = self.client.objects.List(request)
+ response = retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+ self.client.objects.List)(
+ request)
+
for item in response.items:
file_name = 'gs://%s/%s' % (item.bucket, item.name)
- if with_metadata:
- file_info[file_name] = (
- item.size, self._updated_to_seconds(item.updated))
- else:
- file_info[file_name] = item.size
- counter += 1
- if counter % 10000 == 0:
+ if file_name not in file_info:
+ file_info.add(file_name)
+ counter += 1
+ if counter % 10000 == 0:
+ if with_metadata:
+ _LOGGER.info(
+ "Finished computing file information of: %s files",
+ len(file_info))
+ else:
+ _LOGGER.info(
+ "Finished computing size of: %s files", len(file_info))
+
if with_metadata:
- _LOGGER.info(
- "Finished computing file information of: %s files",
- len(file_info))
+ yield file_name, (item.size, self._updated_to_seconds(item.updated))
else:
- _LOGGER.info("Finished computing size of: %s files", len(file_info))
+ yield file_name, item.size
+
if response.nextPageToken:
request.pageToken = response.nextPageToken
else:
@@ -618,7 +647,6 @@ class GcsIO(object):
"Finished listing %s files in %s seconds.",
counter,
time.time() - start_time)
- return file_info
@staticmethod
def _updated_to_seconds(updated):