You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/18 09:25:24 UTC

[beam] branch master updated: [BEAM-5910] Add lastModified field to MatchResult.Metadata

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 410d6c7  [BEAM-5910] Add lastModified field to MatchResult.Metadata
     new 137e86a  Merge pull request #6914: [BEAM-5910] Add lastModified field to MatchResult.Metadata
410d6c7 is described below

commit 410d6c7b5f933dcb0280894553c1e576ee4e4884
Author: Jeff Klukas <je...@klukas.net>
AuthorDate: Thu Nov 1 14:50:32 2018 -0400

    [BEAM-5910] Add lastModified field to MatchResult.Metadata
---
 .../main/java/org/apache/beam/sdk/io/FileIO.java   |  1 +
 .../org/apache/beam/sdk/io/LocalFileSystem.java    |  1 +
 .../org/apache/beam/sdk/io/fs/MatchResult.java     | 31 ++++++++++-
 .../org/apache/beam/sdk/io/fs/MetadataCoder.java   | 22 ++++++--
 .../{MetadataCoder.java => MetadataCoderV2.java}   | 36 ++++++------
 .../java/org/apache/beam/sdk/io/FileIOTest.java    | 31 ++++++++---
 .../apache/beam/sdk/io/fs/MetadataCoderTest.java   | 65 ++++++++++++++++++++++
 .../apache/beam/sdk/io/fs/MetadataCoderV2Test.java | 64 +++++++++++++++++++++
 .../sdk/extensions/gcp/storage/GcsFileSystem.java  |  3 +
 .../apache/beam/sdk/io/aws/s3/S3FileSystem.java    |  9 ++-
 .../apache/beam/sdk/io/aws/s3/S3ResourceId.java    | 18 +++++-
 .../beam/sdk/io/aws/s3/MatchResultMatcher.java     |  3 +-
 .../beam/sdk/io/aws/s3/S3FileSystemTest.java       | 23 +++++++-
 .../apache/beam/sdk/io/hdfs/HadoopFileSystem.java  |  1 +
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java     | 14 +++++
 15 files changed, 283 insertions(+), 39 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 99dddc5..8d48a28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -745,6 +745,7 @@ public class FileIO {
                 MatchResult.Metadata.builder()
                     .setResourceId(metadata.resourceId())
                     .setSizeBytes(metadata.sizeBytes())
+                    .setLastModifiedMillis(metadata.lastModifiedMillis())
                     .setIsReadSeekEfficient(
                         metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED)
                     .build(),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 3862536..8d231ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -272,6 +272,7 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
         .setResourceId(LocalResourceId.fromPath(file.toPath(), file.isDirectory()))
         .setIsReadSeekEfficient(true)
         .setSizeBytes(file.length())
+        .setLastModifiedMillis(file.lastModified())
         .build();
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index c0e2bfa..3d5e114 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -18,9 +18,11 @@
 package org.apache.beam.sdk.io.fs;
 
 import com.google.auto.value.AutoValue;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.FileSystems;
 
 /** The result of {@link org.apache.beam.sdk.io.FileSystem#match}. */
@@ -78,14 +80,39 @@ public abstract class MatchResult {
   /** {@link Metadata} of a matched file. */
   @AutoValue
   public abstract static class Metadata implements Serializable {
+    private static final long UNKNOWN_LAST_MODIFIED_MILLIS = 0L;
+
     public abstract ResourceId resourceId();
 
     public abstract long sizeBytes();
 
     public abstract boolean isReadSeekEfficient();
 
+    /**
+     * Last modification timestamp in milliseconds since Unix epoch.
+     *
+     * <p>Note that this field is not encoded with the default {@link MetadataCoder} due to a need
+     * for compatibility with previous versions of the Beam SDK. If you want to rely on {@code
+     * lastModifiedMillis} values, be sure to explicitly set the coder to {@link MetadataCoderV2}.
+     * Otherwise, all instances will have the default value of 0, consistent with the behavior of
+     * {@link File#lastModified()}.
+     *
+     * <p>The following example sets the coder explicitly and accesses {@code lastModifiedMillis} to
+     * set record timestamps:
+     *
+     * <pre>{@code
+     * PCollection<Metadata> metadataWithTimestamp = p
+     *     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+     *     .setCoder(MetadataCoderV2.of())
+     *     .apply(WithTimestamps.of(metadata -> new Instant(metadata.lastModifiedMillis())));
+     * }</pre>
+     */
+    @Experimental
+    public abstract long lastModifiedMillis();
+
     public static Builder builder() {
-      return new AutoValue_MatchResult_Metadata.Builder();
+      return new AutoValue_MatchResult_Metadata.Builder()
+          .setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS);
     }
 
     /** Builder class for {@link Metadata}. */
@@ -97,6 +124,8 @@ public abstract class MatchResult {
 
       public abstract Builder setIsReadSeekEfficient(boolean value);
 
+      public abstract Builder setLastModifiedMillis(long value);
+
       public abstract Metadata build();
     }
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
index 5c9c4d7..65261e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -26,15 +26,24 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 
-/** A {@link Coder} for {@link Metadata}. */
+/**
+ * A {@link Coder} for {@link Metadata}.
+ *
+ * <p>The {@link Metadata#lastModifiedMillis()} field was added after this coder was already
+ * deployed, so this class decodes a default value for backwards compatibility. See {@link
+ * MetadataCoderV2} for retaining timestamp information.
+ */
 public class MetadataCoder extends AtomicCoder<Metadata> {
+  private static final MetadataCoder INSTANCE = new MetadataCoder();
   private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
   private static final VarIntCoder INT_CODER = VarIntCoder.of();
   private static final VarLongCoder LONG_CODER = VarLongCoder.of();
 
-  /** Creates a {@link MetadataCoder}. */
+  private MetadataCoder() {}
+
+  /** Returns the singleton {@link MetadataCoder} instance. */
   public static MetadataCoder of() {
-    return new MetadataCoder();
+    return INSTANCE;
   }
 
   @Override
@@ -46,14 +55,17 @@ public class MetadataCoder extends AtomicCoder<Metadata> {
 
   @Override
   public Metadata decode(InputStream is) throws IOException {
+    return decodeBuilder(is).build();
+  }
+
+  Metadata.Builder decodeBuilder(InputStream is) throws IOException {
     ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
     boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
     long sizeBytes = LONG_CODER.decode(is);
     return Metadata.builder()
         .setResourceId(resourceId)
         .setIsReadSeekEfficient(isReadSeekEfficient)
-        .setSizeBytes(sizeBytes)
-        .build();
+        .setSizeBytes(sizeBytes);
   }
 
   @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
similarity index 61%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
index 5c9c4d7..4e164d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
@@ -20,40 +20,38 @@ package org.apache.beam.sdk.io.fs;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata.Builder;
 
-/** A {@link Coder} for {@link Metadata}. */
-public class MetadataCoder extends AtomicCoder<Metadata> {
-  private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
-  private static final VarIntCoder INT_CODER = VarIntCoder.of();
+/** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */
+@Experimental
+public class MetadataCoderV2 extends AtomicCoder<Metadata> {
+  private static final MetadataCoderV2 INSTANCE = new MetadataCoderV2();
+  private static final MetadataCoder V1_CODER = MetadataCoder.of();
   private static final VarLongCoder LONG_CODER = VarLongCoder.of();
 
-  /** Creates a {@link MetadataCoder}. */
-  public static MetadataCoder of() {
-    return new MetadataCoder();
+  private MetadataCoderV2() {}
+
+  /** Returns the singleton {@link MetadataCoderV2} instance. */
+  public static MetadataCoderV2 of() {
+    return INSTANCE;
   }
 
   @Override
   public void encode(Metadata value, OutputStream os) throws IOException {
-    RESOURCE_ID_CODER.encode(value.resourceId(), os);
-    INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
-    LONG_CODER.encode(value.sizeBytes(), os);
+    V1_CODER.encode(value, os);
+    LONG_CODER.encode(value.lastModifiedMillis(), os);
   }
 
   @Override
   public Metadata decode(InputStream is) throws IOException {
-    ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
-    boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
-    long sizeBytes = LONG_CODER.decode(is);
-    return Metadata.builder()
-        .setResourceId(resourceId)
-        .setIsReadSeekEfficient(isReadSeekEfficient)
-        .setSizeBytes(sizeBytes)
-        .build();
+    Builder builder = V1_CODER.decodeBuilder(is);
+    long lastModifiedMillis = LONG_CODER.decode(is);
+    return builder.setLastModifiedMillis(lastModifiedMillis).build();
   }
 
   @Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index a282acf..d46ec11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
 import java.io.Writer;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.zip.GZIPOutputStream;
@@ -79,24 +80,30 @@ public class FileIOTest implements Serializable {
     Path secondPath = tmpFolder.newFile("second").toPath();
     int firstSize = 37;
     int secondSize = 42;
+    long firstModified = 1541097000L;
+    long secondModified = 1541098000L;
     Files.write(firstPath, new byte[firstSize]);
     Files.write(secondPath, new byte[secondSize]);
+    Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified));
+    Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified));
+    MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified);
+    MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified);
 
     PAssert.that(
             p.apply(
                 "Match existing",
                 FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")))
-        .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+        .containsInAnyOrder(firstMetadata, secondMetadata);
     PAssert.that(
             p.apply(
                 "Match existing with provider",
                 FileIO.match()
                     .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*"))))
-        .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+        .containsInAnyOrder(firstMetadata, secondMetadata);
     PAssert.that(
             p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
                 .apply("MatchAll existing", FileIO.matchAll()))
-        .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+        .containsInAnyOrder(firstMetadata, secondMetadata);
 
     PAssert.that(
             p.apply(
@@ -232,9 +239,18 @@ public class FileIOTest implements Serializable {
 
     List<MatchResult.Metadata> expected =
         Arrays.asList(
-            metadata(basePath.resolve("first"), 42),
-            metadata(basePath.resolve("second"), 37),
-            metadata(basePath.resolve("third"), 99));
+            metadata(
+                basePath.resolve("first"),
+                42,
+                Files.getLastModifiedTime(basePath.resolve("first")).toMillis()),
+            metadata(
+                basePath.resolve("second"),
+                37,
+                Files.getLastModifiedTime(basePath.resolve("second")).toMillis()),
+            metadata(
+                basePath.resolve("third"),
+                99,
+                Files.getLastModifiedTime(basePath.resolve("third")).toMillis()));
     PAssert.that(matchMetadata).containsInAnyOrder(expected);
     PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
     p.run();
@@ -309,11 +325,12 @@ public class FileIOTest implements Serializable {
     p.run();
   }
 
-  private static MatchResult.Metadata metadata(Path path, int size) {
+  private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) {
     return MatchResult.Metadata.builder()
         .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */))
         .setIsReadSeekEfficient(true)
         .setSizeBytes(size)
+        .setLastModifiedMillis(lastModifiedMillis)
         .build();
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java
new file mode 100644
index 0000000..47b7a31
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.nio.file.Path;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link MetadataCoder}. */
+public class MetadataCoderTest {
+
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
+    Path filePath = tmpFolder.newFile("somefile").toPath();
+    Metadata metadata =
+        Metadata.builder()
+            .setResourceId(
+                FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes(1024)
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
+    Path filePath = tmpFolder.newFile("somefile").toPath();
+    Metadata metadata =
+        Metadata.builder()
+            .setResourceId(
+                FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes(1024)
+            .setLastModifiedMillis(1541097000L)
+            .build();
+    // This should throw because the decoded Metadata has default lastModifiedMills.
+    CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
+  }
+
+  @Test
+  public void testCoderSerializable() {
+    CoderProperties.coderSerializable(MetadataCoder.of());
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java
new file mode 100644
index 0000000..7527f9f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.nio.file.Path;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link MetadataCoderV2}. */
+public class MetadataCoderV2Test {
+
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
+    Path filePath = tmpFolder.newFile("somefile").toPath();
+    Metadata metadata =
+        Metadata.builder()
+            .setResourceId(
+                FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes(1024)
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
+  }
+
+  @Test
+  public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
+    Path filePath = tmpFolder.newFile("somefile").toPath();
+    Metadata metadata =
+        Metadata.builder()
+            .setResourceId(
+                FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes(1024)
+            .setLastModifiedMillis(1541097000L)
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
+  }
+
+  @Test
+  public void testCoderSerializable() {
+    CoderProperties.coderSerializable(MetadataCoderV2.of());
+  }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index db6aea9..84004bc 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
+import com.google.api.client.util.DateTime;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import java.io.FileNotFoundException;
@@ -268,6 +269,8 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
             .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
     BigInteger size = firstNonNull(storageObject.getSize(), BigInteger.ZERO);
     ret.setSizeBytes(size.longValue());
+    DateTime lastModified = firstNonNull(storageObject.getUpdated(), new DateTime(0L));
+    ret.setLastModifiedMillis(lastModified.getValue());
     return ret.build();
   }
 
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 1276a79..39ebcbb 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -47,6 +47,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -318,7 +319,8 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
         if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) {
           S3ResourceId expandedPath =
               S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey())
-                  .withSize(objectSummary.getSize());
+                  .withSize(objectSummary.getSize())
+                  .withLastModified(objectSummary.getLastModified());
           LOG.debug("Expanded S3 object path {}", expandedPath);
           expandedPaths.add(expandedPath);
         }
@@ -373,7 +375,8 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
         MatchResult.Status.OK,
         ImmutableList.of(
             createBeamMetadata(
-                path.withSize(s3Metadata.getContentLength()),
+                path.withSize(s3Metadata.getContentLength())
+                    .withLastModified(s3Metadata.getLastModified()),
                 Strings.nullToEmpty(s3Metadata.getContentEncoding()))));
   }
 
@@ -382,10 +385,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     checkArgument(path.getSize().isPresent(), "path has size");
     checkNotNull(contentEncoding, "contentEncoding");
     boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding);
+
     return MatchResult.Metadata.builder()
         .setIsReadSeekEfficient(isReadSeekEfficient)
         .setResourceId(path)
         .setSizeBytes(path.getSize().get())
+        .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L))
         .build();
   }
 
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
index 120d08b..1c9a2c3 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
+import java.util.Date;
 import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -44,19 +45,22 @@ class S3ResourceId implements ResourceId {
   private final String bucket;
   private final String key;
   private final Long size;
+  private final Date lastModified;
 
-  private S3ResourceId(String bucket, String key, @Nullable Long size) {
+  private S3ResourceId(
+      String bucket, String key, @Nullable Long size, @Nullable Date lastModified) {
     checkArgument(!Strings.isNullOrEmpty(bucket), "bucket");
     this.bucket = bucket;
     this.key = checkNotNull(key, "key");
     this.size = size;
+    this.lastModified = lastModified;
   }
 
   static S3ResourceId fromComponents(String bucket, String key) {
     if (!key.startsWith("/")) {
       key = "/" + key;
     }
-    return new S3ResourceId(bucket, key, null);
+    return new S3ResourceId(bucket, key, null, null);
   }
 
   static S3ResourceId fromUri(String uri) {
@@ -85,7 +89,15 @@ class S3ResourceId implements ResourceId {
   }
 
   S3ResourceId withSize(long size) {
-    return new S3ResourceId(bucket, key, size);
+    return new S3ResourceId(bucket, key, size, lastModified);
+  }
+
+  Optional<Date> getLastModified() {
+    return Optional.fromNullable(lastModified);
+  }
+
+  S3ResourceId withLastModified(Date lastModified) {
+    return new S3ResourceId(bucket, key, size, lastModified);
   }
 
   @Override
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
index aabbfb8..c0b3105 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
@@ -58,10 +58,11 @@ class MatchResultMatcher extends BaseMatcher<MatchResult> {
   }
 
   static MatchResultMatcher create(
-      long sizeBytes, ResourceId resourceId, boolean isReadSeekEfficient) {
+      long sizeBytes, long lastModifiedMillis, ResourceId resourceId, boolean isReadSeekEfficient) {
     return create(
         MatchResult.Metadata.builder()
             .setSizeBytes(sizeBytes)
+            .setLastModifiedMillis(lastModifiedMillis)
             .setResourceId(resourceId)
             .setIsReadSeekEfficient(isReadSeekEfficient)
             .build());
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index 46640e3..9f2650e 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -67,6 +67,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -330,9 +331,11 @@ public class S3FileSystemTest {
     S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+    long lastModifiedMillis = 1540000000000L;
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
     s3ObjectMetadata.setContentEncoding("read-seek-efficient");
+    s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
     when(s3FileSystem
             .getAmazonS3Client()
             .getObjectMetadata(
@@ -348,6 +351,7 @@ public class S3FileSystemTest {
             ImmutableList.of(
                 MatchResult.Metadata.builder()
                     .setSizeBytes(100)
+                    .setLastModifiedMillis(lastModifiedMillis)
                     .setResourceId(path)
                     .setIsReadSeekEfficient(true)
                     .build())));
@@ -358,8 +362,10 @@ public class S3FileSystemTest {
     S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+    long lastModifiedMillis = 1540000000000L;
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
+    s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
     s3ObjectMetadata.setContentEncoding("gzip");
     when(s3FileSystem
             .getAmazonS3Client()
@@ -376,6 +382,7 @@ public class S3FileSystemTest {
             ImmutableList.of(
                 MatchResult.Metadata.builder()
                     .setSizeBytes(100)
+                    .setLastModifiedMillis(lastModifiedMillis)
                     .setResourceId(path)
                     .setIsReadSeekEfficient(false)
                     .build())));
@@ -386,8 +393,10 @@ public class S3FileSystemTest {
     S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+    long lastModifiedMillis = 1540000000000L;
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
+    s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
     s3ObjectMetadata.setContentEncoding(null);
     when(s3FileSystem
             .getAmazonS3Client()
@@ -404,6 +413,7 @@ public class S3FileSystemTest {
             ImmutableList.of(
                 MatchResult.Metadata.builder()
                     .setSizeBytes(100)
+                    .setLastModifiedMillis(lastModifiedMillis)
                     .setResourceId(path)
                     .setIsReadSeekEfficient(true)
                     .build())));
@@ -488,12 +498,14 @@ public class S3FileSystemTest {
     firstMatch.setBucketName(path.getBucket());
     firstMatch.setKey("foo/bar0baz");
     firstMatch.setSize(100);
+    firstMatch.setLastModified(new Date(1540000000001L));
 
     // Expected to not be returned; prefix matches, but substring after wildcard does not
     S3ObjectSummary secondMatch = new S3ObjectSummary();
     secondMatch.setBucketName(path.getBucket());
     secondMatch.setKey("foo/bar1qux");
     secondMatch.setSize(200);
+    secondMatch.setLastModified(new Date(1540000000002L));
 
     // Expected first request returns continuation token
     ListObjectsV2Result firstResult = new ListObjectsV2Result();
@@ -517,6 +529,7 @@ public class S3FileSystemTest {
     thirdMatch.setBucketName(path.getBucket());
     thirdMatch.setKey("foo/bar2baz");
     thirdMatch.setSize(300);
+    thirdMatch.setLastModified(new Date(1540000000003L));
 
     // Expected second request returns third prefix match and no continuation token
     ListObjectsV2Result secondResult = new ListObjectsV2Result();
@@ -542,6 +555,7 @@ public class S3FileSystemTest {
                         S3ResourceId.fromComponents(
                             firstMatch.getBucketName(), firstMatch.getKey()))
                     .setSizeBytes(firstMatch.getSize())
+                    .setLastModifiedMillis(firstMatch.getLastModified().getTime())
                     .build(),
                 MatchResult.Metadata.builder()
                     .setIsReadSeekEfficient(true)
@@ -549,6 +563,7 @@ public class S3FileSystemTest {
                         S3ResourceId.fromComponents(
                             thirdMatch.getBucketName(), thirdMatch.getKey()))
                     .setSizeBytes(thirdMatch.getSize())
+                    .setLastModifiedMillis(thirdMatch.getLastModified().getTime())
                     .build())));
   }
 
@@ -569,12 +584,14 @@ public class S3FileSystemTest {
     firstMatch.setBucketName(path.getBucket());
     firstMatch.setKey("foo/bar\\baz0");
     firstMatch.setSize(100);
+    firstMatch.setLastModified(new Date(1540000000001L));
 
     // Expected to not be returned; prefix matches, but substring after wildcard does not
     S3ObjectSummary secondMatch = new S3ObjectSummary();
     secondMatch.setBucketName(path.getBucket());
     secondMatch.setKey("foo/bar/baz1");
     secondMatch.setSize(200);
+    secondMatch.setLastModified(new Date(1540000000002L));
 
     // Expected first request returns continuation token
     ListObjectsV2Result result = new ListObjectsV2Result();
@@ -600,6 +617,7 @@ public class S3FileSystemTest {
                         S3ResourceId.fromComponents(
                             firstMatch.getBucketName(), firstMatch.getKey()))
                     .setSizeBytes(firstMatch.getSize())
+                    .setLastModifiedMillis(firstMatch.getLastModified().getTime())
                     .build())));
   }
 
@@ -636,6 +654,7 @@ public class S3FileSystemTest {
     S3ResourceId pathExist = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
+    s3ObjectMetadata.setLastModified(new Date(1540000000000L));
     s3ObjectMetadata.setContentEncoding("not-gzip");
     when(s3FileSystem
             .getAmazonS3Client()
@@ -651,6 +670,7 @@ public class S3FileSystemTest {
     foundListObject.setBucketName(pathGlob.getBucket());
     foundListObject.setKey("path/part-0");
     foundListObject.setSize(200);
+    foundListObject.setLastModified(new Date(1541000000000L));
 
     ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
     listObjectsResult.setNextContinuationToken(null);
@@ -679,9 +699,10 @@ public class S3FileSystemTest {
             MatchResultMatcher.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException()),
             MatchResultMatcher.create(
                 MatchResult.Status.ERROR, new IOException(forbiddenException)),
-            MatchResultMatcher.create(100, pathExist, true),
+            MatchResultMatcher.create(100, 1540000000000L, pathExist, true),
             MatchResultMatcher.create(
                 200,
+                1541000000000L,
                 S3ResourceId.fromComponents(pathGlob.getBucket(), foundListObject.getKey()),
                 true)));
   }
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 0bd556f..4eefc86 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -102,6 +102,7 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
                     .setResourceId(new HadoopResourceId(uri))
                     .setIsReadSeekEfficient(true)
                     .setSizeBytes(fileStatus.getLen())
+                    .setLastModifiedMillis(fileStatus.getModificationTime())
                     .build());
           }
         }
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 782e75c..acb550f 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -158,6 +158,7 @@ public class HadoopFileSystemTest {
                         .setResourceId(testPath("testFileB"))
                         .setIsReadSeekEfficient(true)
                         .setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("testFileB"))
                         .build()))));
   }
 
@@ -188,11 +189,13 @@ public class HadoopFileSystemTest {
                 .setResourceId(testPath("testFileAA"))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("testFileAA"))
                 .build(),
             Metadata.builder()
                 .setResourceId(testPath("testFileA"))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("testFileA"))
                 .build()));
   }
 
@@ -223,6 +226,7 @@ public class HadoopFileSystemTest {
                         .setResourceId(testPath("testFileAA"))
                         .setIsReadSeekEfficient(true)
                         .setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("testFileAA"))
                         .build())),
             MatchResult.create(Status.NOT_FOUND, ImmutableList.of()),
             MatchResult.create(
@@ -232,6 +236,7 @@ public class HadoopFileSystemTest {
                         .setResourceId(testPath("testFileBB"))
                         .setIsReadSeekEfficient(true)
                         .setSizeBytes("testDataBB".getBytes(StandardCharsets.UTF_8).length)
+                        .setLastModifiedMillis(lastModified("testFileBB"))
                         .build())));
     assertThat(matchResults, equalTo(expected));
   }
@@ -258,11 +263,13 @@ public class HadoopFileSystemTest {
                 .setResourceId(testPath("renameFileA"))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("renameFileA"))
                 .build(),
             Metadata.builder()
                 .setResourceId(testPath("renameFileB"))
                 .setIsReadSeekEfficient(true)
                 .setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length)
+                .setLastModifiedMillis(lastModified("renameFileB"))
                 .build()));
 
     // ensure files exist
@@ -377,6 +384,13 @@ public class HadoopFileSystemTest {
     }
   }
 
+  private long lastModified(String relativePath) throws Exception {
+    return fileSystem
+        .fileSystem
+        .getFileStatus(testPath(relativePath).toPath())
+        .getModificationTime();
+  }
+
   private HadoopResourceId testPath(String relativePath) {
     return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
   }