You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/02 23:27:42 UTC

[beam] branch master updated: Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0daef62a7bd Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData
0daef62a7bd is described below

commit 0daef62a7bd993b13064de80588e343ee764e004
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon May 2 19:27:36 2022 -0400

    Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updated field in filesystem.FileMetaData
    
    * [BEAM-14314] Add last_updated field in filesystem.FileMetaData
    
    * Add last_updated_in_seconds field in FileMetaData and used in match
    
    * Add metadata method for FileSystem implementations
    
    * Fix precommit
    
    * Fix style and docstring
    
    * increase test coverage for metadata methods
    
    * Address comments and Last updated Metadata improvements
    
    * Fix naming
    
    * Implement last_updated in hadoop filesystem
    
    * [BEAM-9532] fix issue with s3io_test last updated
    
    * Fix pylint and add back removed coverage
    
    * Fix haddopfilesystem unit test
    
    * Address comments and fix flake hadoopfilesystem unit test
    
    * Add comments to time assertion choice in test
    
    * Fix method _status renaming leftover
    
    * Also fix method _status renaming leftover in tests
---
 .../apache_beam/io/aws/clients/s3/fake_client.py   |   6 +-
 sdks/python/apache_beam/io/aws/s3filesystem.py     |  25 ++++-
 .../python/apache_beam/io/aws/s3filesystem_test.py |  40 +++++---
 sdks/python/apache_beam/io/aws/s3io.py             |  98 +++++++++++++------
 sdks/python/apache_beam/io/aws/s3io_test.py        |  21 +++-
 .../apache_beam/io/azure/blobstoragefilesystem.py  |  25 ++++-
 .../io/azure/blobstoragefilesystem_test.py         |  45 ++++++---
 sdks/python/apache_beam/io/azure/blobstorageio.py  | 108 +++++++++++++--------
 sdks/python/apache_beam/io/filesystem.py           |  50 ++++++++--
 sdks/python/apache_beam/io/filesystem_test.py      |   3 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    |  24 ++++-
 .../apache_beam/io/gcp/gcsfilesystem_test.py       |  31 +++---
 sdks/python/apache_beam/io/gcp/gcsio.py            | 105 +++++++++++++-------
 sdks/python/apache_beam/io/gcp/gcsio_test.py       |  20 ++++
 sdks/python/apache_beam/io/hadoopfilesystem.py     |  52 ++++++++--
 .../python/apache_beam/io/hadoopfilesystem_test.py |  25 +++--
 sdks/python/apache_beam/io/localfilesystem.py      |  18 +++-
 sdks/python/apache_beam/io/localfilesystem_test.py |  10 +-
 18 files changed, 522 insertions(+), 184 deletions(-)

diff --git a/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py b/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
index b53ff32825a..e2d34a11a46 100644
--- a/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
+++ b/sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
@@ -20,6 +20,8 @@
 import datetime
 import time
 
+import pytz
+
 from apache_beam.io.aws.clients.s3 import messages
 
 
@@ -39,8 +41,8 @@ class FakeFile(object):
   def get_metadata(self):
     last_modified_datetime = None
     if self.last_modified:
-      last_modified_datetime = datetime.datetime.utcfromtimestamp(
-          self.last_modified)
+      last_modified_datetime = datetime.datetime.fromtimestamp(
+          self.last_modified, pytz.utc)
 
     return messages.Item(
         self.etag,
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py
index 8a5e94e3fc7..68d8ea7e313 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem.py
@@ -128,9 +128,9 @@ class S3FileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, size in \
-        s3io.S3IO(options=self._options).list_prefix(dir_or_prefix).items():
-        yield FileMetadata(path, size)
+      for path, (size, updated) in s3io.S3IO(options=self._options) \
+        .list_prefix(dir_or_prefix, with_metadata=True).items():
+        yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
@@ -281,6 +281,25 @@ class S3FileSystem(FileSystem):
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Checksum operation failed", {path: e})
 
+  def metadata(self, path):
+    """Fetch metadata fields of a file on the FileSystem.
+
+    Args:
+      path: string path of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if path isn't a file or doesn't exist.
+    """
+    try:
+      file_metadata = s3io.S3IO(options=self._options)._status(path)
+      return FileMetadata(
+          path, file_metadata['size'], file_metadata['last_updated'])
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("Metadata operation failed", {path: e})
+
   def delete(self, paths):
     """Deletes files or directories at the provided paths.
     Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
index eba67476f13..dd7209f0cbb 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
@@ -78,22 +78,36 @@ class S3FileSystemTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       self.fs.split('/no/s3/prefix')
 
+  @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
+  def test_match_single(self, unused_mock_arg):
+    # Prepare mocks.
+    s3io_mock = mock.MagicMock()
+    s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
+    s3io_mock._status.return_value = {'size': 1, 'last_updated': 9999999.0}
+    expected_results = [FileMetadata('s3://bucket/file1', 1, 9999999.0)]
+    match_result = self.fs.match(['s3://bucket/file1'])[0]
+
+    self.assertEqual(match_result.metadata_list, expected_results)
+    s3io_mock._status.assert_called_once_with('s3://bucket/file1')
+
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
   def test_match_multiples(self, unused_mock_arg):
     # Prepare mocks.
     s3io_mock = mock.MagicMock()
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
     s3io_mock.list_prefix.return_value = {
-        's3://bucket/file1': 1, 's3://bucket/file2': 2
+        's3://bucket/file1': (1, 9999999.0),
+        's3://bucket/file2': (2, 8888888.0)
     }
     expected_results = set([
-        FileMetadata('s3://bucket/file1', 1),
-        FileMetadata('s3://bucket/file2', 2)
+        FileMetadata('s3://bucket/file1', 1, 9999999.0),
+        FileMetadata('s3://bucket/file2', 2, 8888888.0)
     ])
     match_result = self.fs.match(['s3://bucket/'])[0]
 
     self.assertEqual(set(match_result.metadata_list), expected_results)
-    s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+    s3io_mock.list_prefix.assert_called_once_with(
+        's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
   def test_match_multiples_limit(self, unused_mock_arg):
@@ -101,12 +115,13 @@ class S3FileSystemTest(unittest.TestCase):
     s3io_mock = mock.MagicMock()
     limit = 1
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
-    s3io_mock.list_prefix.return_value = {'s3://bucket/file1': 1}
-    expected_results = set([FileMetadata('s3://bucket/file1', 1)])
+    s3io_mock.list_prefix.return_value = {'s3://bucket/file1': (1, 99999.0)}
+    expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
     match_result = self.fs.match(['s3://bucket/'], [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
-    s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+    s3io_mock.list_prefix.assert_called_once_with(
+        's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
   def test_match_multiples_error(self, unused_mock_arg):
@@ -120,7 +135,8 @@ class S3FileSystemTest(unittest.TestCase):
       self.fs.match(['s3://bucket/'])
 
     self.assertIn('Match operation failed', str(error.exception))
-    s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
+    s3io_mock.list_prefix.assert_called_once_with(
+        's3://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.aws.s3filesystem.s3io')
   def test_match_multiple_patterns(self, unused_mock_arg):
@@ -129,14 +145,14 @@ class S3FileSystemTest(unittest.TestCase):
     s3filesystem.s3io.S3IO = lambda options: s3io_mock  # type: ignore[misc]
     s3io_mock.list_prefix.side_effect = [
         {
-            's3://bucket/file1': 1
+            's3://bucket/file1': (1, 99999.0)
         },
         {
-            's3://bucket/file2': 2
+            's3://bucket/file2': (2, 88888.0)
         },
     ]
-    expected_results = [[FileMetadata('s3://bucket/file1', 1)],
-                        [FileMetadata('s3://bucket/file2', 2)]]
+    expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
+                        [FileMetadata('s3://bucket/file2', 2, 88888.0)]]
     result = self.fs.match(['s3://bucket/file1*', 's3://bucket/file2*'])
     self.assertEqual([mr.metadata_list for mr in result], expected_results)
 
diff --git a/sdks/python/apache_beam/io/aws/s3io.py b/sdks/python/apache_beam/io/aws/s3io.py
index ac0c92f0d37..d8bbfe164be 100644
--- a/sdks/python/apache_beam/io/aws/s3io.py
+++ b/sdks/python/apache_beam/io/aws/s3io.py
@@ -101,23 +101,28 @@ class S3IO(object):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def list_prefix(self, path):
+  def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
     Args:
       path: S3 file path pattern in the form s3://<bucket>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
 
     Returns:
-      Dictionary of file name -> size.
+      If ``with_metadata`` is False: dict of file name -> size; if
+        ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
     bucket, prefix = parse_s3_path(path, object_optional=True)
     request = messages.ListRequest(bucket=bucket, prefix=prefix)
 
-    file_sizes = {}
+    file_info = {}
     counter = 0
     start_time = time.time()
 
-    logging.info("Starting the size estimation of the input")
+    if with_metadata:
+      logging.info("Starting the file information of the input")
+    else:
+      logging.info("Starting the size estimation of the input")
 
     while True:
       #The list operation will raise an exception
@@ -134,10 +139,19 @@ class S3IO(object):
 
       for item in response.items:
         file_name = 's3://%s/%s' % (bucket, item.key)
-        file_sizes[file_name] = item.size
+        if with_metadata:
+          file_info[file_name] = (
+              item.size, self._updated_to_seconds(item.last_modified))
+        else:
+          file_info[file_name] = item.size
         counter += 1
         if counter % 10000 == 0:
-          logging.info("Finished computing size of: %s files", len(file_sizes))
+          if with_metadata:
+            logging.info(
+                "Finished computing file information of: %s files",
+                len(file_info))
+          else:
+            logging.info("Finished computing size of: %s files", len(file_info))
       if response.next_token:
         request.continuation_token = response.next_token
       else:
@@ -148,20 +162,15 @@ class S3IO(object):
         counter,
         time.time() - start_time)
 
-    return file_sizes
+    return file_info
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def checksum(self, path):
     """Looks up the checksum of an S3 object.
 
     Args:
       path: S3 file path pattern in the form s3://<bucket>/<name>.
     """
-    bucket, object_path = parse_s3_path(path)
-    request = messages.GetRequest(bucket, object_path)
-    item = self.client.get_object_metadata(request)
-    return item.etag
+    return self._s3_object(path).etag
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
@@ -400,8 +409,6 @@ class S3IO(object):
     paths = self.list_prefix(root)
     return self.delete_files(paths)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def size(self, path):
     """Returns the size of a single S3 object.
 
@@ -410,10 +417,7 @@ class S3IO(object):
 
     Returns: size of the S3 object in bytes.
     """
-    bucket, object_path = parse_s3_path(path)
-    request = messages.GetRequest(bucket, object_path)
-    item = self.client.get_object_metadata(request)
-    return item.size
+    return self._s3_object(path).size
 
   # We intentionally do not decorate this method with a retry, since the
   # underlying copy and delete operations are already idempotent operations
@@ -428,8 +432,6 @@ class S3IO(object):
     self.copy(src, dest)
     self.delete(src)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def last_updated(self, path):
     """Returns the last updated epoch time of a single S3 object.
 
@@ -438,12 +440,7 @@ class S3IO(object):
 
     Returns: last updated time of the S3 object in second.
     """
-    bucket, object = parse_s3_path(path)
-    request = messages.GetRequest(bucket, object)
-    datetime = self.client.get_object_metadata(request).last_modified
-    return (
-        time.mktime(datetime.timetuple()) - time.timezone +
-        datetime.microsecond / 1000000.0)
+    return self._updated_to_seconds(self._s3_object(path).last_modified)
 
   def exists(self, path):
     """Returns whether the given S3 object exists.
@@ -451,10 +448,8 @@ class S3IO(object):
     Args:
       path: S3 file path pattern in the form s3://<bucket>/<name>.
     """
-    bucket, object = parse_s3_path(path)
-    request = messages.GetRequest(bucket, object)
     try:
-      self.client.get_object_metadata(request)
+      self._s3_object(path)
       return True
     except messages.S3ClientError as e:
       if e.code == 404:
@@ -464,6 +459,49 @@ class S3IO(object):
         # We re-raise all other exceptions
         raise
 
+  def _status(self, path):
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns supported fields (checksum, last_updated, size) of a single object
+    as a dict at once.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single S3 object.
+
+    Returns: dict of fields of the S3 object.
+    """
+    s3_object = self._s3_object(path)
+    file_status = {}
+    if hasattr(s3_object, 'etag'):
+      file_status['checksum'] = s3_object.etag
+    if hasattr(s3_object, 'last_modified'):
+      file_status['last_updated'] = self._updated_to_seconds(
+          s3_object.last_modified)
+    if hasattr(s3_object, 'size'):
+      file_status['size'] = s3_object.size
+    return file_status
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _s3_object(self, path):
+    """Returns a S3 object metadata for the given path
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single S3 object.
+
+    Returns: S3 object metadata.
+    """
+    bucket, object = parse_s3_path(path)
+    request = messages.GetRequest(bucket, object)
+    return self.client.get_object_metadata(request)
+
+  @staticmethod
+  def _updated_to_seconds(updated):
+    """Helper function transform the updated field of response to seconds"""
+    return (
+        time.mktime(updated.timetuple()) - time.timezone +
+        updated.microsecond / 1000000.0)
+
   def rename_files(self, src_dest_pairs):
     """Renames the given S3 objects from src to dest.
 
diff --git a/sdks/python/apache_beam/io/aws/s3io_test.py b/sdks/python/apache_beam/io/aws/s3io_test.py
index cbc05ca25d3..ffab9572707 100644
--- a/sdks/python/apache_beam/io/aws/s3io_test.py
+++ b/sdks/python/apache_beam/io/aws/s3io_test.py
@@ -113,13 +113,13 @@ class TestS3IO(unittest.TestCase):
     self.aws.delete(file_name)
 
   def test_last_updated(self):
-    self.skipTest('BEAM-9532 fix issue with s3 last updated')
     file_name = self.TEST_DATA_PATH + 'dummy_file'
     file_size = 1234
 
     self._insert_random_file(self.client, file_name, file_size)
     self.assertTrue(self.aws.exists(file_name))
-
+    # The time difference should be tiny for the mock client.
+    # A loose tolerance is for the consideration of real s3 client.
     tolerance = 5 * 60  # 5 mins
     result = self.aws.last_updated(file_name)
     self.assertAlmostEqual(result, time.time(), delta=tolerance)
@@ -128,7 +128,6 @@ class TestS3IO(unittest.TestCase):
     self.aws.delete(file_name)
 
   def test_checksum(self):
-
     file_name = self.TEST_DATA_PATH + 'checksum'
     file_size = 1024
     file_ = self._insert_random_file(self.client, file_name, file_size)
@@ -149,6 +148,22 @@ class TestS3IO(unittest.TestCase):
     # Clean up
     self.aws.delete(file_name)
 
+  def test_file_status(self):
+    file_name = self.TEST_DATA_PATH + 'metadata'
+    file_size = 1024
+    self._insert_random_file(self.client, file_name, file_size)
+    file_checksum = self.aws.checksum(file_name)
+    file_timestamp = self.aws.last_updated(file_name)
+
+    file_status = self.aws._status(file_name)
+
+    self.assertEqual(file_status['size'], file_size)
+    self.assertEqual(file_status['checksum'], file_checksum)
+    self.assertEqual(file_status['last_updated'], file_timestamp)
+
+    # Clean up
+    self.aws.delete(file_name)
+
   def test_copy(self):
     src_file_name = self.TEST_DATA_PATH + 'source'
     dest_file_name = self.TEST_DATA_PATH + 'dest'
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index 28a5a5841b3..19ba078cbbe 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -118,9 +118,9 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, size in \
-          blobstorageio.BlobStorageIO().list_prefix(dir_or_prefix).items():
-        yield FileMetadata(path, size)
+      for path, (size, updated) in blobstorageio.BlobStorageIO() \
+        .list_prefix(dir_or_prefix, with_metadata=True).items():
+        yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
@@ -276,6 +276,25 @@ class BlobStorageFileSystem(FileSystem):
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Checksum operation failed", {path, e})
 
+  def metadata(self, path):
+    """Fetch metadata fields of a file on the FileSystem.
+
+    Args:
+      path: string path of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if path isn't a file or doesn't exist.
+    """
+    try:
+      file_metadata = blobstorageio.BlobStorageIO()._status(path)
+      return FileMetadata(
+          path, file_metadata['size'], file_metadata['last_updated'])
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("Metadata operation failed", {path: e})
+
   def delete(self, paths):
     """Deletes files or directories at the provided paths.
     Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index c67dbe9c5c6..241cc72a15d 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -83,6 +83,25 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       self.fs.split('/no/azfs/prefix')
 
+  @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
+  def test_match_single(self, unused_mock_blobstorageio):
+    # Prepare mocks.
+    blobstorageio_mock = mock.MagicMock()
+    blobstoragefilesystem.blobstorageio.BlobStorageIO = \
+        lambda: blobstorageio_mock
+    blobstorageio_mock.exists.return_value = True
+    blobstorageio_mock._status.return_value = {
+        'size': 1, 'last_updated': 99999.0
+    }
+    expected_results = [
+        FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)
+    ]
+    match_result = self.fs.match(['azfs://storageaccount/container/file1'])[0]
+
+    self.assertEqual(match_result.metadata_list, expected_results)
+    blobstorageio_mock._status.assert_called_once_with(
+        'azfs://storageaccount/container/file1')
+
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
   def test_match_multiples(self, unused_mock_blobstorageio):
     # Prepare mocks.
@@ -90,18 +109,18 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda: blobstorageio_mock
     blobstorageio_mock.list_prefix.return_value = {
-        'azfs://storageaccount/container/file1': 1,
-        'azfs://storageaccount/container/file2': 2,
+        'azfs://storageaccount/container/file1': (1, 99999.0),
+        'azfs://storageaccount/container/file2': (2, 88888.0)
     }
     expected_results = set([
-        FileMetadata('azfs://storageaccount/container/file1', 1),
-        FileMetadata('azfs://storageaccount/container/file2', 2),
+        FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0),
+        FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0),
     ])
     match_result = self.fs.match(['azfs://storageaccount/container/'])[0]
 
     self.assertEqual(set(match_result.metadata_list), expected_results)
     blobstorageio_mock.list_prefix.assert_called_once_with(
-        'azfs://storageaccount/container/')
+        'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
   def test_match_multiples_limit(self, unused_mock_blobstorageio):
@@ -111,16 +130,16 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
         lambda: blobstorageio_mock
     blobstorageio_mock.list_prefix.return_value = {
-        'azfs://storageaccount/container/file1': 1
+        'azfs://storageaccount/container/file1': (1, 99999.0)
     }
     expected_results = set(
-        [FileMetadata('azfs://storageaccount/container/file1', 1)])
+        [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)])
     match_result = self.fs.match(['azfs://storageaccount/container/'],
                                  [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
     blobstorageio_mock.list_prefix.assert_called_once_with(
-        'azfs://storageaccount/container/')
+        'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
   def test_match_multiples_error(self, unused_mock_blobstorageio):
@@ -139,7 +158,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
         str(error.exception.exception_details),
         r'azfs://storageaccount/container/.*%s' % exception)
     blobstorageio_mock.list_prefix.assert_called_once_with(
-        'azfs://storageaccount/container/')
+        'azfs://storageaccount/container/', with_metadata=True)
 
   @mock.patch('apache_beam.io.azure.blobstoragefilesystem.blobstorageio')
   def test_match_multiple_patterns(self, unused_mock_blobstorageio):
@@ -149,15 +168,15 @@ class BlobStorageFileSystemTest(unittest.TestCase):
         lambda: blobstorageio_mock
     blobstorageio_mock.list_prefix.side_effect = [
         {
-            'azfs://storageaccount/container/file1': 1
+            'azfs://storageaccount/container/file1': (1, 99999.0)
         },
         {
-            'azfs://storageaccount/container/file2': 2
+            'azfs://storageaccount/container/file2': (2, 88888.0)
         },
     ]
     expected_results = [
-        [FileMetadata('azfs://storageaccount/container/file1', 1)],
-        [FileMetadata('azfs://storageaccount/container/file2', 2)]
+        [FileMetadata('azfs://storageaccount/container/file1', 1, 99999.0)],
+        [FileMetadata('azfs://storageaccount/container/file2', 2, 88888.0)]
     ]
     result = self.fs.match([
         'azfs://storageaccount/container/file1*',
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index d27b81ef852..290b5228198 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -317,8 +317,6 @@ class BlobStorageIO(object):
 
     return results
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_beam_io_error_filter)
   def exists(self, path):
     """Returns whether the given Azure Blob Storage blob exists.
 
@@ -326,10 +324,8 @@ class BlobStorageIO(object):
       path: Azure Blob Storage file path pattern in the form
             azfs://<storage-account>/<container>/[name].
     """
-    container, blob = parse_azfs_path(path)
-    blob_to_check = self.client.get_blob_client(container, blob)
     try:
-      blob_to_check.get_blob_properties()
+      self._blob_properties(path)
       return True
     except ResourceNotFoundError as e:
       if e.status_code == 404:
@@ -339,8 +335,6 @@ class BlobStorageIO(object):
         # We re-raise all other exceptions.
         raise
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_beam_io_error_filter)
   def size(self, path):
     """Returns the size of a single Blob Storage blob.
 
@@ -349,19 +343,8 @@ class BlobStorageIO(object):
 
     Returns: size of the Blob Storage blob in bytes.
     """
-    container, blob = parse_azfs_path(path)
-    blob_to_check = self.client.get_blob_client(container, blob)
-    try:
-      properties = blob_to_check.get_blob_properties()
-    except ResourceNotFoundError as e:
-      message = e.reason
-      code = e.status_code
-      raise BlobStorageError(message, code)
+    return self._blob_properties(path).size
 
-    return properties.size
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_beam_io_error_filter)
   def last_updated(self, path):
     """Returns the last updated epoch time of a single
     Azure Blob Storage blob.
@@ -372,22 +355,8 @@ class BlobStorageIO(object):
     Returns: last updated time of the Azure Blob Storage blob
     in seconds.
     """
-    container, blob = parse_azfs_path(path)
-    blob_to_check = self.client.get_blob_client(container, blob)
-    try:
-      properties = blob_to_check.get_blob_properties()
-    except ResourceNotFoundError as e:
-      message = e.reason
-      code = e.status_code
-      raise BlobStorageError(message, code)
+    return self._updated_to_seconds(self._blob_properties(path).last_modified)
 
-    datatime = properties.last_modified
-    return (
-        time.mktime(datatime.timetuple()) - time.timezone +
-        datatime.microsecond / 1000000.0)
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_beam_io_error_filter)
   def checksum(self, path):
     """Looks up the checksum of an Azure Blob Storage blob.
 
@@ -395,6 +364,40 @@ class BlobStorageIO(object):
       path: Azure Blob Storage file path pattern in the form
             azfs://<storage-account>/<container>/[name].
     """
+    return self._blob_properties(path).properties.etag
+
+  def _status(self, path):
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns supported fields (checksum, last_updated, size) of a single object
+    as a dict at once.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single blob property.
+
+    Returns: dict of fields of the blob property.
+    """
+    properties = self._blob_properties(path)
+    file_status = {}
+    if hasattr(properties, 'etag'):
+      file_status['checksum'] = properties.etag
+    if hasattr(properties, 'last_modified'):
+      file_status['last_updated'] = self._updated_to_seconds(
+          properties.last_modified)
+    if hasattr(properties, 'size'):
+      file_status['size'] = properties.size
+    return file_status
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _blob_properties(self, path):
+    """Returns a blob properties object for the given path
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single blob properties object.
+
+    Returns: blob properties.
+    """
     container, blob = parse_azfs_path(path)
     blob_to_check = self.client.get_blob_client(container, blob)
     try:
@@ -404,7 +407,14 @@ class BlobStorageIO(object):
       code = e.status_code
       raise BlobStorageError(message, code)
 
-    return properties.etag
+    return properties
+
+  @staticmethod
+  def _updated_to_seconds(updated):
+    """Helper function transform the updated field of response to seconds"""
+    return (
+        time.mktime(updated.timetuple()) - time.timezone +
+        updated.microsecond / 1000000.0)
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_beam_io_error_filter)
@@ -559,40 +569,54 @@ class BlobStorageIO(object):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_beam_io_error_filter)
-  def list_prefix(self, path):
+  def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
     Args:
       path: Azure Blob Storage file path pattern in the form
             azfs://<storage-account>/<container>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
 
     Returns:
-      Dictionary of file name -> size.
+      If ``with_metadata`` is False: dict of file name -> size; if
+        ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
     storage_account, container, blob = parse_azfs_path(
         path, blob_optional=True, get_account=True)
-    file_sizes = {}
+    file_info = {}
     counter = 0
     start_time = time.time()
 
-    logging.info("Starting the size estimation of the input")
+    if with_metadata:
+      logging.info("Starting the file information of the input")
+    else:
+      logging.info("Starting the size estimation of the input")
     container_client = self.client.get_container_client(container)
 
     while True:
       response = container_client.list_blobs(name_starts_with=blob)
       for item in response:
         file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
-        file_sizes[file_name] = item.size
+        if with_metadata:
+          file_info[file_name] = (
+              item.size, self._updated_to_seconds(item.last_modified))
+        else:
+          file_info[file_name] = item.size
         counter += 1
         if counter % 10000 == 0:
-          logging.info("Finished computing size of: %s files", len(file_sizes))
+          if with_metadata:
+            logging.info(
+                "Finished computing file information of: %s files",
+                len(file_info))
+          else:
+            logging.info("Finished computing size of: %s files", len(file_info))
       break
 
     logging.info(
         "Finished listing %s files in %s seconds.",
         counter,
         time.time() - start_time)
-    return file_sizes
+    return file_info
 
 
 class BlobStorageDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index c854e587972..a833eef282f 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -421,27 +421,44 @@ class CompressedFile(object):
 
 
 class FileMetadata(object):
-  """Metadata about a file path that is the output of FileSystem.match."""
-  def __init__(self, path, size_in_bytes):
+  """Metadata about a file path that is the output of FileSystem.match.
+
+  Fields:
+    path: [Required] file path.
+    size_in_bytes: [Required] file size in bytes.
+    last_updated_in_seconds: [Optional] last modified timestamp of the file, or
+    valued 0.0 if not specified.
+  """
+  def __init__(
+      self,
+      path: str,
+      size_in_bytes: int,
+      last_updated_in_seconds: float = 0.0):
     assert isinstance(path, str) and path, "Path should be a string"
     assert isinstance(size_in_bytes, int) and size_in_bytes >= 0, \
         "Invalid value for size_in_bytes should %s (of type %s)" % (
             size_in_bytes, type(size_in_bytes))
     self.path = path
     self.size_in_bytes = size_in_bytes
+    self.last_updated_in_seconds = last_updated_in_seconds
 
   def __eq__(self, other):
     """Note: This is only used in tests where we verify that mock objects match.
     """
     return (
         isinstance(other, FileMetadata) and self.path == other.path and
-        self.size_in_bytes == other.size_in_bytes)
+        self.size_in_bytes == other.size_in_bytes and
+        self.last_updated_in_seconds == other.last_updated_in_seconds)
 
   def __hash__(self):
-    return hash((self.path, self.size_in_bytes))
+    return hash((self.path, self.size_in_bytes, self.last_updated_in_seconds))
 
   def __repr__(self):
-    return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+    if self.last_updated_in_seconds == 0.0:
+      return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+    else:
+      return 'FileMetadata(%s, %s, %s)' % (
+          self.path, self.size_in_bytes, self.last_updated_in_seconds)
 
 
 class MatchResult(object):
@@ -717,7 +734,7 @@ class FileSystem(BeamPlugin, metaclass=abc.ABCMeta):
       if prefix_or_dir == pattern:
         # Short-circuit calling self.list() if there's no glob pattern to match.
         if self.exists(pattern):
-          file_metadatas = [FileMetadata(pattern, self.size(pattern))]
+          file_metadatas = [self.metadata(pattern)]
       else:
         if self.has_dirs():
           prefix_dirname = self._url_dirname(prefix_or_dir)
@@ -878,6 +895,27 @@ class FileSystem(BeamPlugin, metaclass=abc.ABCMeta):
     """
     raise NotImplementedError
 
+  @abc.abstractmethod
+  def metadata(self, path):
+    """Fetch metadata of a file on the
+    :class:`~apache_beam.io.filesystem.FileSystem`.
+
+    This operation returns metadata as stored in the underlying
+    FileSystem. It should not need to read file data to obtain this value.
+    For web based file systems, this method should also incur as few as
+    possible requests.
+
+    Args:
+      path: string path of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if path isn't a file or doesn't exist.
+    """
+    raise NotImplementedError
+
   @abc.abstractmethod
   def delete(self, paths):
     """Deletes files or directories at the provided paths.
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
index a2463352de3..4f57fd52bc8 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -103,6 +103,9 @@ class TestingFileSystem(FileSystem):
   def checksum(self, path):
     raise NotImplementedError
 
+  def metadata(self, path):
+    raise NotImplementedError
+
   def delete(self, paths):
     raise NotImplementedError
 
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index e53ceef70c9..6e4b1b6c7f0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -120,8 +120,9 @@ class GCSFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, size in gcsio.GcsIO().list_prefix(dir_or_prefix).items():
-        yield FileMetadata(path, size)
+      for path, (size, updated) in gcsio.GcsIO().list_prefix(
+          dir_or_prefix, with_metadata=True).items():
+        yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
@@ -311,6 +312,25 @@ class GCSFileSystem(FileSystem):
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Checksum operation failed", {path: e})
 
+  def metadata(self, path):
+    """Fetch metadata fields of a file on the FileSystem.
+
+    Args:
+      path: string path of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if path isn't a file or doesn't exist.
+    """
+    try:
+      file_metadata = gcsio.GcsIO()._status(path)
+      return FileMetadata(
+          path, file_metadata['size'], file_metadata['last_updated'])
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("Metadata operation failed", {path: e})
+
   def delete(self, paths):
     """Deletes files or directories at the provided paths.
     Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index c5f80bb6a1d..b4d921ada23 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -83,15 +83,16 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     gcsio_mock.list_prefix.return_value = {
-        'gs://bucket/file1': 1, 'gs://bucket/file2': 2
+        'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
     }
     expected_results = set([
-        FileMetadata('gs://bucket/file1', 1),
-        FileMetadata('gs://bucket/file2', 2)
+        FileMetadata('gs://bucket/file1', 1, 99999.0),
+        FileMetadata('gs://bucket/file2', 2, 88888.0)
     ])
     match_result = self.fs.match(['gs://bucket/'])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
-    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+    gcsio_mock.list_prefix.assert_called_once_with(
+        'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples_limit(self, mock_gcsio):
@@ -99,12 +100,13 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsio_mock = mock.MagicMock()
     limit = 1
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
-    gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': 1}
-    expected_results = set([FileMetadata('gs://bucket/file1', 1)])
+    gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
+    expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
     match_result = self.fs.match(['gs://bucket/'], [limit])[0]
     self.assertEqual(set(match_result.metadata_list), expected_results)
     self.assertEqual(len(match_result.metadata_list), limit)
-    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+    gcsio_mock.list_prefix.assert_called_once_with(
+        'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples_error(self, mock_gcsio):
@@ -119,7 +121,8 @@ class GCSFileSystemTest(unittest.TestCase):
       self.fs.match(['gs://bucket/'])
     self.assertRegex(
         str(error.exception.exception_details), r'gs://bucket/.*%s' % exception)
-    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
+    gcsio_mock.list_prefix.assert_called_once_with(
+        'gs://bucket/', with_metadata=True)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiple_patterns(self, mock_gcsio):
@@ -128,14 +131,14 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     gcsio_mock.list_prefix.side_effect = [
         {
-            'gs://bucket/file1': 1
+            'gs://bucket/file1': (1, 99999.0)
         },
         {
-            'gs://bucket/file2': 2
+            'gs://bucket/file2': (2, 88888.0)
         },
     ]
-    expected_results = [[FileMetadata('gs://bucket/file1', 1)],
-                        [FileMetadata('gs://bucket/file2', 2)]]
+    expected_results = [[FileMetadata('gs://bucket/file1', 1, 99999.0)],
+                        [FileMetadata('gs://bucket/file2', 2, 88888.0)]]
     result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
     self.assertEqual([mr.metadata_list for mr in result], expected_results)
 
@@ -306,7 +309,7 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
-    gcsio_mock.size.return_value = 0
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -324,7 +327,7 @@ class GCSFileSystemTest(unittest.TestCase):
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.delete_batch.side_effect = exception
-    gcsio_mock.size.return_value = 0
+    gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 86046e340d2..87654bc8174 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -460,19 +460,14 @@ class GcsIO(object):
     self.copy(src, dest)
     self.delete(src)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def exists(self, path):
     """Returns whether the given GCS object exists.
 
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
     try:
-      request = storage.StorageObjectsGetRequest(
-          bucket=bucket, object=object_path)
-      self.client.objects.Get(request)  # metadata
+      self._gcs_object(path)  # gcs object
       return True
     except HttpError as http_error:
       if http_error.status_code == 404:
@@ -482,21 +477,14 @@ class GcsIO(object):
         # We re-raise all other exceptions
         raise
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def checksum(self, path):
     """Looks up the checksum of a GCS object.
 
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsGetRequest(
-        bucket=bucket, object=object_path)
-    return self.client.objects.Get(request).crc32c
+    return self._gcs_object(path).crc32c
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def size(self, path):
     """Returns the size of a single GCS object.
 
@@ -505,13 +493,8 @@ class GcsIO(object):
 
     Returns: size of the GCS object in bytes.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsGetRequest(
-        bucket=bucket, object=object_path)
-    return self.client.objects.Get(request).size
+    return self._gcs_object(path).size
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def kms_key(self, path):
     """Returns the KMS key of a single GCS object.
 
@@ -521,13 +504,8 @@ class GcsIO(object):
     Returns: KMS key name of the GCS object as a string, or None if it doesn't
       have one.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsGetRequest(
-        bucket=bucket, object=object_path)
-    return self.client.objects.Get(request).kmsKeyName
+    return self._gcs_object(path).kmsKeyName
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def last_updated(self, path):
     """Returns the last updated epoch time of a single GCS object.
 
@@ -536,39 +514,85 @@ class GcsIO(object):
 
     Returns: last updated time of the GCS object in second.
     """
+    return self._updated_to_seconds(self._gcs_object(path).updated)
+
+  def _status(self, path):
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns supported fields (checksum, kms_key, last_updated, size) of a
+    single object as a dict at once.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single GCS object.
+
+    Returns: dict of fields of the GCS object.
+    """
+    gcs_object = self._gcs_object(path)
+    file_status = {}
+    if hasattr(gcs_object, 'crc32c'):
+      file_status['checksum'] = gcs_object.crc32c
+    if hasattr(gcs_object, 'kmsKeyName'):
+      file_status['kms_key'] = gcs_object.kmsKeyName
+    if hasattr(gcs_object, 'updated'):
+      file_status['last_updated'] = self._updated_to_seconds(gcs_object.updated)
+    if hasattr(gcs_object, 'size'):
+      file_status['size'] = gcs_object.size
+    return file_status
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _gcs_object(self, path):
+    """Returns a gcs object for the given path
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single GCS object.
+
+    Returns: GCS object.
+    """
     bucket, object_path = parse_gcs_path(path)
     request = storage.StorageObjectsGetRequest(
         bucket=bucket, object=object_path)
-    datetime = self.client.objects.Get(request).updated
-    return (
-        time.mktime(datetime.timetuple()) - time.timezone +
-        datetime.microsecond / 1000000.0)
+    return self.client.objects.Get(request)
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def list_prefix(self, path):
+  def list_prefix(self, path, with_metadata=False):
     """Lists files matching the prefix.
 
     Args:
       path: GCS file path pattern in the form gs://<bucket>/[name].
+      with_metadata: Experimental. Specify whether returns file metadata.
 
     Returns:
-      Dictionary of file name -> size.
+      If ``with_metadata`` is False: dict of file name -> size; if
+        ``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
     """
     bucket, prefix = parse_gcs_path(path, object_optional=True)
     request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
-    file_sizes = {}
+    file_info = {}
     counter = 0
     start_time = time.time()
-    _LOGGER.info("Starting the size estimation of the input")
+    if with_metadata:
+      _LOGGER.info("Starting the file information of the input")
+    else:
+      _LOGGER.info("Starting the size estimation of the input")
     while True:
       response = self.client.objects.List(request)
       for item in response.items:
         file_name = 'gs://%s/%s' % (item.bucket, item.name)
-        file_sizes[file_name] = item.size
+        if with_metadata:
+          file_info[file_name] = (
+              item.size, self._updated_to_seconds(item.updated))
+        else:
+          file_info[file_name] = item.size
         counter += 1
         if counter % 10000 == 0:
-          _LOGGER.info("Finished computing size of: %s files", len(file_sizes))
+          if with_metadata:
+            _LOGGER.info(
+                "Finished computing file information of: %s files",
+                len(file_info))
+          else:
+            _LOGGER.info("Finished computing size of: %s files", len(file_info))
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
       else:
@@ -577,7 +601,14 @@ class GcsIO(object):
         "Finished listing %s files in %s seconds.",
         counter,
         time.time() - start_time)
-    return file_sizes
+    return file_info
+
+  @staticmethod
+  def _updated_to_seconds(updated):
+    """Helper function transform the updated field of response to seconds"""
+    return (
+        time.mktime(updated.timetuple()) - time.timezone +
+        updated.microsecond / 1000000.0)
 
 
 class GcsDownloader(Downloader):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 73a7984f320..a4aa2d4aa85 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -405,6 +405,26 @@ class TestGCSIO(unittest.TestCase):
     self.assertTrue(self.gcs.exists(file_name))
     self.assertEqual(last_updated, self.gcs.last_updated(file_name))
 
+  def test_file_status(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    last_updated = 123456.78
+    checksum = 'deadbeef'
+
+    self._insert_random_file(
+        self.client,
+        file_name,
+        file_size,
+        last_updated=last_updated,
+        crc32c=checksum)
+    file_checksum = self.gcs.checksum(file_name)
+
+    file_status = self.gcs._status(file_name)
+
+    self.assertEqual(file_status['size'], file_size)
+    self.assertEqual(file_status['checksum'], file_checksum)
+    self.assertEqual(file_status['last_updated'], last_updated)
+
   def test_file_mode(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
     with self.gcs.open(file_name, 'wb') as f:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 046908d930e..825e9ca8326 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -51,6 +51,7 @@ _FILE_CHECKSUM_BYTES = 'bytes'
 _FILE_CHECKSUM_LENGTH = 'length'
 # WebHDFS FileStatus property constants.
 _FILE_STATUS_LENGTH = 'length'
+_FILE_STATUS_UPDATED = 'modificationTime'
 _FILE_STATUS_PATH_SUFFIX = 'pathSuffix'
 _FILE_STATUS_TYPE = 'type'
 _FILE_STATUS_TYPE_DIRECTORY = 'DIRECTORY'
@@ -212,7 +213,8 @@ class HadoopFileSystem(FileSystem):
       for res in self._hdfs_client.list(path, status=True):
         yield FileMetadata(
             _HDFS_PREFIX + self._join(server, path, res[0]),
-            res[1][_FILE_STATUS_LENGTH])
+            res[1][_FILE_STATUS_LENGTH],
+            res[1][_FILE_STATUS_UPDATED] / 1000.0)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError('List operation failed', {url: e})
 
@@ -376,20 +378,37 @@ class HadoopFileSystem(FileSystem):
     return self._hdfs_client.status(path, strict=False) is not None
 
   def size(self, url):
-    _, path = self._parse_url(url)
-    status = self._hdfs_client.status(path, strict=False)
-    if status is None:
-      raise BeamIOError('File not found: %s' % url)
-    return status[_FILE_STATUS_LENGTH]
+    """Fetches file size for a URL.
+
+    Returns:
+      int size of path according to the FileSystem.
+
+    Raises:
+      ``BeamIOError``: if url doesn't exist.
+    """
+    return self.metadata(url).size_in_bytes
 
   def last_updated(self, url):
-    raise NotImplementedError
+    """Fetches last updated time for a URL.
+
+    Args:
+      url: string url of file.
+
+    Returns: float UNIX Epoch time
+
+    Raises:
+      ``BeamIOError``: if path doesn't exist.
+    """
+    return self.metadata(url).last_updated_in_seconds
 
   def checksum(self, url):
     """Fetches a checksum description for a URL.
 
     Returns:
       String describing the checksum.
+
+    Raises:
+      ``BeamIOError``: if url doesn't exist.
     """
     _, path = self._parse_url(url)
     file_checksum = self._hdfs_client.checksum(path)
@@ -399,6 +418,25 @@ class HadoopFileSystem(FileSystem):
         file_checksum[_FILE_CHECKSUM_BYTES],
     )
 
+  def metadata(self, url):
+    """Fetch metadata fields of a file on the FileSystem.
+
+    Args:
+      url: string url of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if url doesn't exist.
+    """
+    _, path = self._parse_url(url)
+    status = self._hdfs_client.status(path, strict=False)
+    if status is None:
+      raise BeamIOError('File not found: %s' % url)
+    return FileMetadata(
+        url, status[_FILE_STATUS_LENGTH], status[_FILE_STATUS_UPDATED] / 1000.0)
+
   def delete(self, urls):
     exceptions = {}
     for url in urls:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index ed56927cec0..2486b1c8717 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -22,6 +22,7 @@
 import io
 import logging
 import posixpath
+import time
 import unittest
 
 from parameterized import parameterized_class
@@ -36,17 +37,16 @@ class FakeFile(io.BytesIO):
   """File object for FakeHdfs"""
   __hash__ = None  # type: ignore[assignment]
 
-  def __init__(self, path, mode='', type='FILE'):
+  def __init__(self, path, mode='', type='FILE', time_ms=None):
     io.BytesIO.__init__(self)
-
-    self.stat = {
-        'path': path,
-        'mode': mode,
-        'type': type,
-    }
+    if time_ms is None:
+      time_ms = int(time.time() * 1000)
+    self.time_ms = time_ms
+    self.stat = {'path': path, 'mode': mode, 'type': type}
     self.saved_data = None
 
   def __eq__(self, other):
+    """Equality of two files. Timestamp not included in comparison"""
     return self.stat == other.stat and self.getvalue() == self.getvalue()
 
   def close(self):
@@ -73,6 +73,7 @@ class FakeFile(io.BytesIO):
         hdfs._FILE_STATUS_PATH_SUFFIX: posixpath.basename(self.stat['path']),
         hdfs._FILE_STATUS_LENGTH: self.size,
         hdfs._FILE_STATUS_TYPE: self.stat['type'],
+        hdfs._FILE_STATUS_UPDATED: self.time_ms
     }
 
   def get_file_checksum(self):
@@ -538,6 +539,16 @@ class HadoopFileSystemTest(unittest.TestCase):
     self.assertEqual(
         'fake_algo-5-checksum_byte_sequence', self.fs.checksum(url))
 
+  def test_last_updated(self):
+    url = self.fs.join(self.tmpdir, 'f1')
+    with self.fs.create(url) as f:
+      f.write(b'Hello')
+    # The time difference should be tiny for the mock hdfs.
+    # A loose tolerance is for the consideration of real web hdfs.
+    tolerance = 5 * 60  # 5 mins
+    result = self.fs.last_updated(url)
+    self.assertAlmostEqual(result, time.time(), delta=tolerance)
+
   def test_delete_file(self):
     url = self.fs.join(self.tmpdir, 'old_file1')
 
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 56668aa8952..3580b79ea56 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -121,7 +121,7 @@ class LocalFileSystem(FileSystem):
     try:
       for f in list_files(dir_or_prefix):
         try:
-          yield FileMetadata(f, os.path.getsize(f))
+          yield FileMetadata(f, os.path.getsize(f), os.path.getmtime(f))
         except OSError:
           # Files may disappear, such as when listing /tmp.
           pass
@@ -311,6 +311,22 @@ class LocalFileSystem(FileSystem):
       raise BeamIOError('Path does not exist: %s' % path)
     return str(os.path.getsize(path))
 
+  def metadata(self, path):
+    """Fetch metadata fields of a file on the FileSystem.
+
+    Args:
+      path: string path of a file.
+
+    Returns:
+      :class:`~apache_beam.io.filesystem.FileMetadata`.
+
+    Raises:
+      ``BeamIOError``: if path isn't a file or doesn't exist.
+    """
+    if not self.exists(path):
+      raise BeamIOError('Path does not exist: %s' % path)
+    return FileMetadata(path, os.path.getsize(path), os.path.getmtime(path))
+
   def delete(self, paths):
     """Deletes files or directories at the provided paths.
     Directories will be deleted recursively.
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 84455c28f4c..1370790970e 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -293,8 +293,14 @@ class LocalFileSystemTest(unittest.TestCase):
       f.write('Hello')
     with open(path2, 'a') as f:
       f.write('foo')
-    self.assertEqual(self.fs.checksum(path1), str(5))
-    self.assertEqual(self.fs.checksum(path2), str(3))
+    # tests that localfilesystem checksum returns file size
+    checksum1 = self.fs.checksum(path1)
+    checksum2 = self.fs.checksum(path2)
+    self.assertEqual(checksum1, str(5))
+    self.assertEqual(checksum2, str(3))
+    # tests that fs.checksum and str(fs.size) are consistent
+    self.assertEqual(checksum1, str(self.fs.size(path1)))
+    self.assertEqual(checksum2, str(self.fs.size(path2)))
 
   def make_tree(self, path, value, expected_leaf_count=None):
     """Create a file+directory structure from a simple dict-based DSL