You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/01 22:43:22 UTC

[beam] branch master updated: Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 03052670499 Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)
03052670499 is described below

commit 03052670499ad953056dd55914d58c0c49961171
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Dec 1 17:43:15 2022 -0500

    Reduce calls to FileSystem.match and API calls in FileSystem._list (#24317)
    
    * Reuse concat_source in FileBasedSource.estimate_size
    
    * Create list_files to replace list_prefix to conduct lazy listing
      files in GcsIO, S3IO, BlobStorageIO
---
 sdks/python/apache_beam/io/aws/s3filesystem.py     |  4 +-
 .../python/apache_beam/io/aws/s3filesystem_test.py | 30 +++++------
 sdks/python/apache_beam/io/aws/s3io.py             | 58 +++++++++++++++------
 .../apache_beam/io/azure/blobstoragefilesystem.py  |  4 +-
 .../io/azure/blobstoragefilesystem_test.py         | 32 +++++-------
 sdks/python/apache_beam/io/azure/blobstorageio.py  | 49 +++++++++++++-----
 sdks/python/apache_beam/io/filebasedsource.py      |  5 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    |  4 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py       | 29 +++++------
 sdks/python/apache_beam/io/gcp/gcsio.py            | 60 ++++++++++++++++------
 10 files changed, 171 insertions(+), 104 deletions(-)

diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py
index 68d8ea7e313..636b0a12f3e 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem.py
@@ -128,8 +128,8 @@ class S3FileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, (size, updated) in s3io.S3IO(options=self._options) \
-        .list_prefix(dir_or_prefix, with_metadata=True).items():
+      for path, (size, updated) in s3io.S3IO(options=self._options).list_files(
+          dir_or_prefix, with_metadata=True):
         yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
index dd7209f0cbb..60e6f319b2c 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
@@ -95,10 +95,10 @@ class S3FileSystemTest(unittest.TestCase):
     # Prepare mocks.
     s3io_mock = mock.MagicMock()
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
-    s3io_mock.list_prefix.return_value = {
-        's3://bucket/file1': (1, 9999999.0),
-        's3://bucket/file2': (2, 8888888.0)
-    }
+    s3io_mock.list_files.return_value = iter([
+        ('s3://bucket/file1', (1, 9999999.0)),
+        ('s3://bucket/file2', (2, 8888888.0))
+    ])
     expected_results = set([
         FileMetadata('s3://bucket/file1', 1, 9999999.0),
         FileMetadata('s3://bucket/file2', 2, 8888888.0)
@@ -106,7 +106,7 @@ class S3FileSystemTest(unittest.TestCase):
     match_result = self.fs.match(['s3://bucket/'])[0]
 
     self.assertEqual(set(match_result.metadata_list), expected_results)
-    s3io_mock.list_prefix.assert_called_once_with(
+    s3io_mock.list_files.assert_called_once_with(
         's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -115,12 +115,14 @@ class S3FileSystemTest(unittest.TestCase):
     s3io_mock = mock.MagicMock()
     limit = 1
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
-    s3io_mock.list_prefix.return_value = {'s3://bucket/file1': (1, 99999.0)}
+    s3io_mock.list_files.return_value = iter([
+        ('s3://bucket/file1', (1, 99999.0))
+    ])
     expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
     match_result = self.fs.match(['s3://bucket/'], [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
-    s3io_mock.list_prefix.assert_called_once_with(
+    s3io_mock.list_files.assert_called_once_with(
         's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -129,13 +131,13 @@ class S3FileSystemTest(unittest.TestCase):
     s3io_mock = mock.MagicMock()
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
     exception = IOError('Failed')
-    s3io_mock.list_prefix.side_effect = exception
+    s3io_mock.list_files.side_effect = exception
 
     with self.assertRaises(BeamIOError) as error:
       self.fs.match(['s3://bucket/'])
 
     self.assertIn('Match operation failed', str(error.exception))
-    s3io_mock.list_prefix.assert_called_once_with(
+    s3io_mock.list_files.assert_called_once_with(
         's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
@@ -143,13 +145,9 @@ class S3FileSystemTest(unittest.TestCase):
     # Prepare mocks.
     s3io_mock = mock.MagicMock()
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
-    s3io_mock.list_prefix.side_effect = [
-        {
-            's3://bucket/file1': (1, 99999.0)
-        },
-        {
-            's3://bucket/file2': (2, 88888.0)
-        },
+    s3io_mock.list_files.side_effect = [
+        iter([('s3://bucket/file1', (1, 99999.0))]),
+        iter([('s3://bucket/file2', (2, 88888.0))]),
     ]
     expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
                         [FileMetadata('s3://bucket/file2', 2, 88888.0)]]
diff --git a/sdks/python/apache_beam/io/aws/s3io.py b/sdks/python/apache_beam/io/aws/s3io.py
index 66bdaa52e90..887bb4c7baa 100644
--- a/sdks/python/apache_beam/io/aws/s3io.py
+++ b/sdks/python/apache_beam/io/aws/s3io.py
@@ -33,6 +33,7 @@ from apache_beam.io.filesystemio import DownloaderStream
 from apache_beam.io.filesystemio import Uploader
 from apache_beam.io.filesystemio import UploaderStream
 from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
 
 try:
   # pylint: disable=wrong-import-order, wrong-import-position
@@ -99,11 +100,13 @@ class S3IO(object):
     else:
       raise ValueError('Invalid file open mode: %s.' % mode)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  @deprecated(since='2.45.0', current='list_files')
   def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
+    ``list_prefix`` has been deprecated. Use `list_files` instead, which returns
+    a generator of file information instead of a dict.
+
     Args:
       path: S3 file path pattern in the form s3://<bucket>/[name].
       with_metadata: Experimental. Specify whether returns file metadata.
@@ -112,10 +115,28 @@ class S3IO(object):
       If ``with_metadata`` is False: dict of file name -> size; if
         ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
+    file_info = {}
+    for file_metadata in self.list_files(path, with_metadata):
+      file_info[file_metadata[0]] = file_metadata[1]
+
+    return file_info
+
+  def list_files(self, path, with_metadata=False):
+    """Lists files matching the prefix.
+
+    Args:
+      path: S3 file path pattern in the form s3://<bucket>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
+
+    Returns:
+      If ``with_metadata`` is False: generator of tuple(file name, size); if
+      ``with_metadata`` is True: generator of
+      tuple(file name, tuple(size, timestamp)).
+    """
     bucket, prefix = parse_s3_path(path, object_optional=True)
     request = messages.ListRequest(bucket=bucket, prefix=prefix)
 
-    file_info = {}
+    file_info = set()
     counter = 0
     start_time = time.time()
 
@@ -130,7 +151,10 @@ class S3IO(object):
       #This should not be an issue here.
       #Ignore this exception or it will break the procedure.
       try:
-        response = self.client.list(request)
+        response = retry.with_exponential_backoff(
+            retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+                self.client.list)(
+                    request)
       except messages.S3ClientError as e:
         if e.code == 404:
           break
@@ -139,19 +163,23 @@ class S3IO(object):
 
       for item in response.items:
         file_name = 's3://%s/%s' % (bucket, item.key)
-        if with_metadata:
-          file_info[file_name] = (
-              item.size, self._updated_to_seconds(item.last_modified))
-        else:
-          file_info[file_name] = item.size
-        counter += 1
-        if counter % 10000 == 0:
+        if file_name not in file_info:
+          file_info.add(file_name)
+          counter += 1
+          if counter % 10000 == 0:
+            if with_metadata:
+              logging.info(
+                  "Finished computing file information of: %s files",
+                  len(file_info))
+            else:
+              logging.info(
+                  "Finished computing size of: %s files", len(file_info))
           if with_metadata:
-            logging.info(
-                "Finished computing file information of: %s files",
-                len(file_info))
+            yield file_name, (
+                item.size, self._updated_to_seconds(item.last_modified))
           else:
-            logging.info("Finished computing size of: %s files", len(file_info))
+            yield file_name, item.size
+
       if response.next_token:
         request.continuation_token = response.next_token
       else:
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index cc678f9ae0f..8bc3dd68281 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -122,8 +122,8 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, (size, updated) in self._blobstorageIO() \
-        .list_prefix(dir_or_prefix, with_metadata=True).items():
+      for path, (size, updated) in self._blobstorageIO().list_files(
+          dir_or_prefix, with_metadata=True):
         yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index 6c844329867..cee459f5b8a 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -108,10 +108,10 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda pipeline_options: blobstorageio_mock
-    blobstorageio_mock.list_prefix.return_value = {
-        'azfs://storageaccount/container/file1': (1, 99999.0),
-        'azfs://storageaccount/container/file2': (2, 88888.0)
-    }
+    blobstorageio_mock.list_files.return_value = iter([
+        ('azfs://storageaccount/container/file1', (1, 99999.0)),
+        ('azfs://storageaccount/container/file2', (2, 88888.0))
+    ])
     expected_results = set([
         FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0),
         FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0),
@@ -119,7 +119,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     match_result = self.fs.match(['azfs://storageaccount/container/'])[0]
 
     self.assertEqual(set(match_result.metadata_list), expected_results)
-    blobstorageio_mock.list_prefix.assert_called_once_with(
+    blobstorageio_mock.list_files.assert_called_once_with(
         'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -129,16 +129,16 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     limit = 1
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda pipeline_options: blobstorageio_mock
-    blobstorageio_mock.list_prefix.return_value = {
-        'azfs://storageaccount/container/file1': (1, 99999.0)
-    }
+    blobstorageio_mock.list_files.return_value = iter([
+        ('azfs://storageaccount/container/file1', (1, 99999.0))
+    ])
     expected_results = set(
         [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)])
     match_result = self.fs.match(['azfs://storageaccount/container/'],
                                  [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
-    blobstorageio_mock.list_prefix.assert_called_once_with(
+    blobstorageio_mock.list_files.assert_called_once_with(
         'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -148,7 +148,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda pipeline_options: blobstorageio_mock
     exception = IOError('Failed')
-    blobstorageio_mock.list_prefix.side_effect = exception
+    blobstorageio_mock.list_files.side_effect = exception
 
     with self.assertRaisesRegex(BeamIOError,
                                 r'^Match operation failed') as error:
@@ -157,7 +157,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     self.assertRegex(
         str(error.exception.exception_details),
         r'azfs://storageaccount/container/.*%s' % exception)
-    blobstorageio_mock.list_prefix.assert_called_once_with(
+    blobstorageio_mock.list_files.assert_called_once_with(
         'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
@@ -166,13 +166,9 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda pipeline_options: blobstorageio_mock
-    blobstorageio_mock.list_prefix.side_effect = [
-        {
-            'azfs://storageaccount/container/file1': (1, 99999.0)
-        },
-        {
-            'azfs://storageaccount/container/file2': (2, 88888.0)
-        },
+    blobstorageio_mock.list_files.side_effect = [
+        iter([('azfs://storageaccount/container/file1', (1, 99999.0))]),
+        iter([('azfs://storageaccount/container/file2', (2, 88888.0))]),
     ]
     expected_results = [
         [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)],
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index 948dda32b8f..cfa4fe7d291 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -35,6 +35,7 @@ from apache_beam.io.filesystemio import Uploader
 from apache_beam.io.filesystemio import UploaderStream
 from apache_beam.options.pipeline_options import AzureOptions
 from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -578,8 +579,7 @@ class BlobStorageIO(object):
 
     return results
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_beam_io_error_filter)
+  @deprecated(since='2.45.0', current='list_files')
   def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
@@ -592,9 +592,28 @@ class BlobStorageIO(object):
       If ``with_metadata`` is False: dict of file name -> size; if
         ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
+    file_info = {}
+    for file_metadata in self.list_files(path, with_metadata):
+      file_info[file_metadata[0]] = file_metadata[1]
+
+    return file_info
+
+  def list_files(self, path, with_metadata=False):
+    """Lists files matching the prefix.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
+
+    Returns:
+      If ``with_metadata`` is False: generator of tuple(file name, size); if
+      ``with_metadata`` is True: generator of
+      tuple(file name, tuple(size, timestamp)).
+    """
     storage_account, container, blob = parse_azfs_path(
         path, blob_optional=True, get_account=True)
-    file_info = {}
+    file_info = set()
     counter = 0
     start_time = time.time()
 
@@ -604,15 +623,14 @@ class BlobStorageIO(object):
       logging.debug("Starting the size estimation of the input")
     container_client = self.client.get_container_client(container)
 
-    while True:
-      response = container_client.list_blobs(name_starts_with=blob)
-      for item in response:
-        file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
-        if with_metadata:
-          file_info[file_name] = (
-              item.size, self._updated_to_seconds(item.last_modified))
-        else:
-          file_info[file_name] = item.size
+    response = retry.with_exponential_backoff(
+        retry_filter=retry.retry_on_beam_io_error_filter)(
+            container_client.list_blobs)(
+                name_starts_with=blob)
+    for item in response:
+      file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+      if file_name not in file_info:
+        file_info.add(file_name)
         counter += 1
         if counter % 10000 == 0:
           if with_metadata:
@@ -621,7 +639,11 @@ class BlobStorageIO(object):
                 len(file_info))
           else:
             logging.info("Finished computing size of: %s files", len(file_info))
-      break
+        if with_metadata:
+          yield file_name, (
+              item.size, self._updated_to_seconds(item.last_modified))
+        else:
+          yield file_name, item.size
 
     logging.log(
         # do not spam logs when list_prefix is likely used to check empty folder
@@ -629,7 +651,6 @@ class BlobStorageIO(object):
         "Finished listing %s files in %s seconds.",
         counter,
         time.time() - start_time)
-    return file_info
 
 
 class BlobStorageDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index e2cd26868e5..240fc65c52b 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -196,11 +196,8 @@ class FileBasedSource(iobase.BoundedSource):
         start_position=start_position,
         stop_position=stop_position)
 
-  @check_accessible(['_pattern'])
   def estimate_size(self):
-    pattern = self._pattern.get()
-    match_result = FileSystems.match([pattern])[0]
-    return sum([f.size_in_bytes for f in match_result.metadata_list])
+    return self._get_concat_source().estimate_size()
 
   def read(self, range_tracker):
     return self._get_concat_source().read(range_tracker)
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 11184cd34fd..c87a8499c91 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -131,8 +131,8 @@ class GCSFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, (size, updated) in self._gcsIO().list_prefix(
-          dir_or_prefix, with_metadata=True).items():
+      for path, (size, updated) in self._gcsIO().list_files(dir_or_prefix,
+                                                            with_metadata=True):
         yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 49b0bdc9f6c..800bd5d1c46 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -82,16 +82,17 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
-    gcsio_mock.list_prefix.return_value = {
-        'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
-    }
+    gcsio_mock.list_files.return_value = iter([
+        ('gs://bucket/file1', (1, 99999.0)),
+        ('gs://bucket/file2', (2, 88888.0))
+    ])
     expected_results = set([
         FileMetadata('gs://bucket/file1', 1, 99999.0),
         FileMetadata('gs://bucket/file2', 2, 88888.0)
     ])
     match_result = self.fs.match(['gs://bucket/'])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
-    gcsio_mock.list_prefix.assert_called_once_with(
+    gcsio_mock.list_files.assert_called_once_with(
         'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -100,12 +101,14 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsio_mock = mock.MagicMock()
     limit = 1
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
-    gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
+    gcsio_mock.list_files.return_value = iter([
+        ('gs://bucket/file1', (1, 99999.0))
+    ])
     expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
     match_result = self.fs.match(['gs://bucket/'], [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
-    gcsio_mock.list_prefix.assert_called_once_with(
+    gcsio_mock.list_files.assert_called_once_with(
         'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -114,14 +117,14 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     exception = IOError('Failed')
-    gcsio_mock.list_prefix.side_effect = exception
+    gcsio_mock.list_files.side_effect = exception
 
     with self.assertRaisesRegex(BeamIOError,
                                 r'^Match operation failed') as error:
       self.fs.match(['gs://bucket/'])
     self.assertRegex(
         str(error.exception.exception_details), r'gs://bucket/.*%s' % exception)
-    gcsio_mock.list_prefix.assert_called_once_with(
+    gcsio_mock.list_files.assert_called_once_with(
         'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -129,13 +132,9 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
-    gcsio_mock.list_prefix.side_effect = [
-        {
-            'gs://bucket/file1': (1, 99999.0)
-        },
-        {
-            'gs://bucket/file2': (2, 88888.0)
-        },
+    gcsio_mock.list_files.side_effect = [
+        iter([('gs://bucket/file1', (1, 99999.0))]),
+        iter([('gs://bucket/file2', (2, 88888.0))]),
     ]
     expected_results = [[FileMetadata('gs://bucket/file1', 1, 99999.0)],
                         [FileMetadata('gs://bucket/file2', 2, 88888.0)]]
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index e34a0b77453..5aee4f5aaf3 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -53,6 +53,7 @@ from apache_beam.io.gcp import resource_identifiers
 from apache_beam.metrics import monitoring_infos
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.utils import retry
+from apache_beam.utils.annotations import deprecated
 
 __all__ = ['GcsIO']
 
@@ -569,11 +570,13 @@ class GcsIO(object):
         bucket=bucket, object=object_path)
     return self.client.objects.Get(request)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  @deprecated(since='2.45.0', current='list_files')
   def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
+    ``list_prefix`` has been deprecated. Use `list_files` instead, which returns
+    a generator of file information instead of a dict.
+
     Args:
       path: GCS file path pattern in the form gs://<bucket>/[name].
       with_metadata: Experimental. Specify whether returns file metadata.
@@ -582,9 +585,27 @@ class GcsIO(object):
       If ``with_metadata`` is False: dict of file name -> size; if
         ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
+    file_info = {}
+    for file_metadata in self.list_files(path, with_metadata):
+      file_info[file_metadata[0]] = file_metadata[1]
+
+    return file_info
+
+  def list_files(self, path, with_metadata=False):
+    """Lists files matching the prefix.
+
+    Args:
+      path: GCS file path pattern in the form gs://<bucket>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
+
+    Returns:
+      If ``with_metadata`` is False: generator of tuple(file name, size); if
+      ``with_metadata`` is True: generator of
+      tuple(file name, tuple(size, timestamp)).
+    """
     bucket, prefix = parse_gcs_path(path, object_optional=True)
     request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
-    file_info = {}
+    file_info = set()
     counter = 0
     start_time = time.time()
     if with_metadata:
@@ -592,22 +613,30 @@ class GcsIO(object):
     else:
       _LOGGER.debug("Starting the size estimation of the input")
     while True:
-      response = self.client.objects.List(request)
+      response = retry.with_exponential_backoff(
+          retry_filter=retry.retry_on_server_errors_and_timeout_filter)(
+              self.client.objects.List)(
+                  request)
+
       for item in response.items:
         file_name = 'gs://%s/%s' % (item.bucket, item.name)
-        if with_metadata:
-          file_info[file_name] = (
-              item.size, self._updated_to_seconds(item.updated))
-        else:
-          file_info[file_name] = item.size
-        counter += 1
-        if counter % 10000 == 0:
+        if file_name not in file_info:
+          file_info.add(file_name)
+          counter += 1
+          if counter % 10000 == 0:
+            if with_metadata:
+              _LOGGER.info(
+                  "Finished computing file information of: %s files",
+                  len(file_info))
+            else:
+              _LOGGER.info(
+                  "Finished computing size of: %s files", len(file_info))
+
           if with_metadata:
-            _LOGGER.info(
-                "Finished computing file information of: %s files",
-                len(file_info))
+            yield file_name, (item.size, self._updated_to_seconds(item.updated))
           else:
-            _LOGGER.info("Finished computing size of: %s files", len(file_info))
+            yield file_name, item.size
+
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
       else:
@@ -618,7 +647,6 @@ class GcsIO(object):
         "Finished listing %s files in %s seconds.",
         counter,
         time.time() - start_time)
-    return file_info
 
   @staticmethod
   def _updated_to_seconds(updated):