You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/18 09:25:24 UTC
[beam] branch master updated: [BEAM-5910] Add lastModified field to
MatchResult.Metadata
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 410d6c7 [BEAM-5910] Add lastModified field to MatchResult.Metadata
new 137e86a Merge pull request #6914: [BEAM-5910] Add lastModified field to MatchResult.Metadata
410d6c7 is described below
commit 410d6c7b5f933dcb0280894553c1e576ee4e4884
Author: Jeff Klukas <je...@klukas.net>
AuthorDate: Thu Nov 1 14:50:32 2018 -0400
[BEAM-5910] Add lastModified field to MatchResult.Metadata
---
.../main/java/org/apache/beam/sdk/io/FileIO.java | 1 +
.../org/apache/beam/sdk/io/LocalFileSystem.java | 1 +
.../org/apache/beam/sdk/io/fs/MatchResult.java | 31 ++++++++++-
.../org/apache/beam/sdk/io/fs/MetadataCoder.java | 22 ++++++--
.../{MetadataCoder.java => MetadataCoderV2.java} | 36 ++++++------
.../java/org/apache/beam/sdk/io/FileIOTest.java | 31 ++++++++---
.../apache/beam/sdk/io/fs/MetadataCoderTest.java | 65 ++++++++++++++++++++++
.../apache/beam/sdk/io/fs/MetadataCoderV2Test.java | 64 +++++++++++++++++++++
.../sdk/extensions/gcp/storage/GcsFileSystem.java | 3 +
.../apache/beam/sdk/io/aws/s3/S3FileSystem.java | 9 ++-
.../apache/beam/sdk/io/aws/s3/S3ResourceId.java | 18 +++++-
.../beam/sdk/io/aws/s3/MatchResultMatcher.java | 3 +-
.../beam/sdk/io/aws/s3/S3FileSystemTest.java | 23 +++++++-
.../apache/beam/sdk/io/hdfs/HadoopFileSystem.java | 1 +
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 14 +++++
15 files changed, 283 insertions(+), 39 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 99dddc5..8d48a28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -745,6 +745,7 @@ public class FileIO {
MatchResult.Metadata.builder()
.setResourceId(metadata.resourceId())
.setSizeBytes(metadata.sizeBytes())
+ .setLastModifiedMillis(metadata.lastModifiedMillis())
.setIsReadSeekEfficient(
metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED)
.build(),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 3862536..8d231ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -272,6 +272,7 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
.setResourceId(LocalResourceId.fromPath(file.toPath(), file.isDirectory()))
.setIsReadSeekEfficient(true)
.setSizeBytes(file.length())
+ .setLastModifiedMillis(file.lastModified())
.build();
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index c0e2bfa..3d5e114 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -18,9 +18,11 @@
package org.apache.beam.sdk.io.fs;
import com.google.auto.value.AutoValue;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.FileSystems;
/** The result of {@link org.apache.beam.sdk.io.FileSystem#match}. */
@@ -78,14 +80,39 @@ public abstract class MatchResult {
/** {@link Metadata} of a matched file. */
@AutoValue
public abstract static class Metadata implements Serializable {
+ private static final long UNKNOWN_LAST_MODIFIED_MILLIS = 0L;
+
public abstract ResourceId resourceId();
public abstract long sizeBytes();
public abstract boolean isReadSeekEfficient();
+ /**
+ * Last modification timestamp in milliseconds since Unix epoch.
+ *
+ * <p>Note that this field is not encoded with the default {@link MetadataCoder} due to a need
+ * for compatibility with previous versions of the Beam SDK. If you want to rely on {@code
+ * lastModifiedMillis} values, be sure to explicitly set the coder to {@link MetadataCoderV2}.
+ * Otherwise, all instances will have the default value of 0, consistent with the behavior of
+ * {@link File#lastModified()}.
+ *
+ * <p>The following example sets the coder explicitly and accesses {@code lastModifiedMillis} to
+ * set record timestamps:
+ *
+ * <pre>{@code
+ * PCollection<Metadata> metadataWithTimestamp = p
+ * .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+ * .setCoder(MetadataCoderV2.of())
+ * .apply(WithTimestamps.of(metadata -> new Instant(metadata.lastModifiedMillis())));
+ * }</pre>
+ */
+ @Experimental
+ public abstract long lastModifiedMillis();
+
public static Builder builder() {
- return new AutoValue_MatchResult_Metadata.Builder();
+ return new AutoValue_MatchResult_Metadata.Builder()
+ .setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS);
}
/** Builder class for {@link Metadata}. */
@@ -97,6 +124,8 @@ public abstract class MatchResult {
public abstract Builder setIsReadSeekEfficient(boolean value);
+ public abstract Builder setLastModifiedMillis(long value);
+
public abstract Metadata build();
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
index 5c9c4d7..65261e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -26,15 +26,24 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-/** A {@link Coder} for {@link Metadata}. */
+/**
+ * A {@link Coder} for {@link Metadata}.
+ *
+ * <p>The {@link Metadata#lastModifiedMillis()} field was added after this coder was already
+ * deployed, so this class decodes a default value for backwards compatibility. See {@link
+ * MetadataCoderV2} for retaining timestamp information.
+ */
public class MetadataCoder extends AtomicCoder<Metadata> {
+ private static final MetadataCoder INSTANCE = new MetadataCoder();
private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
private static final VarIntCoder INT_CODER = VarIntCoder.of();
private static final VarLongCoder LONG_CODER = VarLongCoder.of();
- /** Creates a {@link MetadataCoder}. */
+ private MetadataCoder() {}
+
+ /** Returns the singleton {@link MetadataCoder} instance. */
public static MetadataCoder of() {
- return new MetadataCoder();
+ return INSTANCE;
}
@Override
@@ -46,14 +55,17 @@ public class MetadataCoder extends AtomicCoder<Metadata> {
@Override
public Metadata decode(InputStream is) throws IOException {
+ return decodeBuilder(is).build();
+ }
+
+ Metadata.Builder decodeBuilder(InputStream is) throws IOException {
ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
long sizeBytes = LONG_CODER.decode(is);
return Metadata.builder()
.setResourceId(resourceId)
.setIsReadSeekEfficient(isReadSeekEfficient)
- .setSizeBytes(sizeBytes)
- .build();
+ .setSizeBytes(sizeBytes);
}
@Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
similarity index 61%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
index 5c9c4d7..4e164d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java
@@ -20,40 +20,38 @@ package org.apache.beam.sdk.io.fs;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata.Builder;
-/** A {@link Coder} for {@link Metadata}. */
-public class MetadataCoder extends AtomicCoder<Metadata> {
- private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
- private static final VarIntCoder INT_CODER = VarIntCoder.of();
+/** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */
+@Experimental
+public class MetadataCoderV2 extends AtomicCoder<Metadata> {
+ private static final MetadataCoderV2 INSTANCE = new MetadataCoderV2();
+ private static final MetadataCoder V1_CODER = MetadataCoder.of();
private static final VarLongCoder LONG_CODER = VarLongCoder.of();
- /** Creates a {@link MetadataCoder}. */
- public static MetadataCoder of() {
- return new MetadataCoder();
+ private MetadataCoderV2() {}
+
+ /** Returns the singleton {@link MetadataCoderV2} instance. */
+ public static MetadataCoderV2 of() {
+ return INSTANCE;
}
@Override
public void encode(Metadata value, OutputStream os) throws IOException {
- RESOURCE_ID_CODER.encode(value.resourceId(), os);
- INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
- LONG_CODER.encode(value.sizeBytes(), os);
+ V1_CODER.encode(value, os);
+ LONG_CODER.encode(value.lastModifiedMillis(), os);
}
@Override
public Metadata decode(InputStream is) throws IOException {
- ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
- boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
- long sizeBytes = LONG_CODER.decode(is);
- return Metadata.builder()
- .setResourceId(resourceId)
- .setIsReadSeekEfficient(isReadSeekEfficient)
- .setSizeBytes(sizeBytes)
- .build();
+ Builder builder = V1_CODER.decodeBuilder(is);
+ long lastModifiedMillis = LONG_CODER.decode(is);
+ return builder.setLastModifiedMillis(lastModifiedMillis).build();
}
@Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index a282acf..d46ec11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
@@ -79,24 +80,30 @@ public class FileIOTest implements Serializable {
Path secondPath = tmpFolder.newFile("second").toPath();
int firstSize = 37;
int secondSize = 42;
+ long firstModified = 1541097000L;
+ long secondModified = 1541098000L;
Files.write(firstPath, new byte[firstSize]);
Files.write(secondPath, new byte[secondSize]);
+ Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified));
+ Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified));
+ MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified);
+ MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified);
PAssert.that(
p.apply(
"Match existing",
FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")))
- .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+ .containsInAnyOrder(firstMetadata, secondMetadata);
PAssert.that(
p.apply(
"Match existing with provider",
FileIO.match()
.filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*"))))
- .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+ .containsInAnyOrder(firstMetadata, secondMetadata);
PAssert.that(
p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
.apply("MatchAll existing", FileIO.matchAll()))
- .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
+ .containsInAnyOrder(firstMetadata, secondMetadata);
PAssert.that(
p.apply(
@@ -232,9 +239,18 @@ public class FileIOTest implements Serializable {
List<MatchResult.Metadata> expected =
Arrays.asList(
- metadata(basePath.resolve("first"), 42),
- metadata(basePath.resolve("second"), 37),
- metadata(basePath.resolve("third"), 99));
+ metadata(
+ basePath.resolve("first"),
+ 42,
+ Files.getLastModifiedTime(basePath.resolve("first")).toMillis()),
+ metadata(
+ basePath.resolve("second"),
+ 37,
+ Files.getLastModifiedTime(basePath.resolve("second")).toMillis()),
+ metadata(
+ basePath.resolve("third"),
+ 99,
+ Files.getLastModifiedTime(basePath.resolve("third")).toMillis()));
PAssert.that(matchMetadata).containsInAnyOrder(expected);
PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
p.run();
@@ -309,11 +325,12 @@ public class FileIOTest implements Serializable {
p.run();
}
- private static MatchResult.Metadata metadata(Path path, int size) {
+ private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) {
return MatchResult.Metadata.builder()
.setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(size)
+ .setLastModifiedMillis(lastModifiedMillis)
.build();
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java
new file mode 100644
index 0000000..47b7a31
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.nio.file.Path;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link MetadataCoder}. */
+public class MetadataCoderTest {
+
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
+ Path filePath = tmpFolder.newFile("somefile").toPath();
+ Metadata metadata =
+ Metadata.builder()
+ .setResourceId(
+ FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(1024)
+ .build();
+ CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
+ Path filePath = tmpFolder.newFile("somefile").toPath();
+ Metadata metadata =
+ Metadata.builder()
+ .setResourceId(
+ FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(1024)
+ .setLastModifiedMillis(1541097000L)
+ .build();
+ // This should throw because the decoded Metadata has default lastModifiedMills.
+ CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
+ }
+
+ @Test
+ public void testCoderSerializable() {
+ CoderProperties.coderSerializable(MetadataCoder.of());
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java
new file mode 100644
index 0000000..7527f9f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.nio.file.Path;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link MetadataCoderV2}. */
+public class MetadataCoderV2Test {
+
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
+ Path filePath = tmpFolder.newFile("somefile").toPath();
+ Metadata metadata =
+ Metadata.builder()
+ .setResourceId(
+ FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(1024)
+ .build();
+ CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
+ }
+
+ @Test
+ public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
+ Path filePath = tmpFolder.newFile("somefile").toPath();
+ Metadata metadata =
+ Metadata.builder()
+ .setResourceId(
+ FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes(1024)
+ .setLastModifiedMillis(1541097000L)
+ .build();
+ CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
+ }
+
+ @Test
+ public void testCoderSerializable() {
+ CoderProperties.coderSerializable(MetadataCoderV2.of());
+ }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index db6aea9..84004bc 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+import com.google.api.client.util.DateTime;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import java.io.FileNotFoundException;
@@ -268,6 +269,8 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
.setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
BigInteger size = firstNonNull(storageObject.getSize(), BigInteger.ZERO);
ret.setSizeBytes(size.longValue());
+ DateTime lastModified = firstNonNull(storageObject.getUpdated(), new DateTime(0L));
+ ret.setLastModifiedMillis(lastModified.getValue());
return ret.build();
}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 1276a79..39ebcbb 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -47,6 +47,7 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -318,7 +319,8 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) {
S3ResourceId expandedPath =
S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey())
- .withSize(objectSummary.getSize());
+ .withSize(objectSummary.getSize())
+ .withLastModified(objectSummary.getLastModified());
LOG.debug("Expanded S3 object path {}", expandedPath);
expandedPaths.add(expandedPath);
}
@@ -373,7 +375,8 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
MatchResult.Status.OK,
ImmutableList.of(
createBeamMetadata(
- path.withSize(s3Metadata.getContentLength()),
+ path.withSize(s3Metadata.getContentLength())
+ .withLastModified(s3Metadata.getLastModified()),
Strings.nullToEmpty(s3Metadata.getContentEncoding()))));
}
@@ -382,10 +385,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
checkArgument(path.getSize().isPresent(), "path has size");
checkNotNull(contentEncoding, "contentEncoding");
boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding);
+
return MatchResult.Metadata.builder()
.setIsReadSeekEfficient(isReadSeekEfficient)
.setResourceId(path)
.setSizeBytes(path.getSize().get())
+ .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L))
.build();
}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
index 120d08b..1c9a2c3 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+import java.util.Date;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -44,19 +45,22 @@ class S3ResourceId implements ResourceId {
private final String bucket;
private final String key;
private final Long size;
+ private final Date lastModified;
- private S3ResourceId(String bucket, String key, @Nullable Long size) {
+ private S3ResourceId(
+ String bucket, String key, @Nullable Long size, @Nullable Date lastModified) {
checkArgument(!Strings.isNullOrEmpty(bucket), "bucket");
this.bucket = bucket;
this.key = checkNotNull(key, "key");
this.size = size;
+ this.lastModified = lastModified;
}
static S3ResourceId fromComponents(String bucket, String key) {
if (!key.startsWith("/")) {
key = "/" + key;
}
- return new S3ResourceId(bucket, key, null);
+ return new S3ResourceId(bucket, key, null, null);
}
static S3ResourceId fromUri(String uri) {
@@ -85,7 +89,15 @@ class S3ResourceId implements ResourceId {
}
S3ResourceId withSize(long size) {
- return new S3ResourceId(bucket, key, size);
+ return new S3ResourceId(bucket, key, size, lastModified);
+ }
+
+ Optional<Date> getLastModified() {
+ return Optional.fromNullable(lastModified);
+ }
+
+ S3ResourceId withLastModified(Date lastModified) {
+ return new S3ResourceId(bucket, key, size, lastModified);
}
@Override
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
index aabbfb8..c0b3105 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
@@ -58,10 +58,11 @@ class MatchResultMatcher extends BaseMatcher<MatchResult> {
}
static MatchResultMatcher create(
- long sizeBytes, ResourceId resourceId, boolean isReadSeekEfficient) {
+ long sizeBytes, long lastModifiedMillis, ResourceId resourceId, boolean isReadSeekEfficient) {
return create(
MatchResult.Metadata.builder()
.setSizeBytes(sizeBytes)
+ .setLastModifiedMillis(lastModifiedMillis)
.setResourceId(resourceId)
.setIsReadSeekEfficient(isReadSeekEfficient)
.build());
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index 46640e3..9f2650e 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -67,6 +67,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -330,9 +331,11 @@ public class S3FileSystemTest {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+ long lastModifiedMillis = 1540000000000L;
ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
s3ObjectMetadata.setContentLength(100);
s3ObjectMetadata.setContentEncoding("read-seek-efficient");
+ s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
when(s3FileSystem
.getAmazonS3Client()
.getObjectMetadata(
@@ -348,6 +351,7 @@ public class S3FileSystemTest {
ImmutableList.of(
MatchResult.Metadata.builder()
.setSizeBytes(100)
+ .setLastModifiedMillis(lastModifiedMillis)
.setResourceId(path)
.setIsReadSeekEfficient(true)
.build())));
@@ -358,8 +362,10 @@ public class S3FileSystemTest {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+ long lastModifiedMillis = 1540000000000L;
ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
s3ObjectMetadata.setContentLength(100);
+ s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
s3ObjectMetadata.setContentEncoding("gzip");
when(s3FileSystem
.getAmazonS3Client()
@@ -376,6 +382,7 @@ public class S3FileSystemTest {
ImmutableList.of(
MatchResult.Metadata.builder()
.setSizeBytes(100)
+ .setLastModifiedMillis(lastModifiedMillis)
.setResourceId(path)
.setIsReadSeekEfficient(false)
.build())));
@@ -386,8 +393,10 @@ public class S3FileSystemTest {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
+ long lastModifiedMillis = 1540000000000L;
ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
s3ObjectMetadata.setContentLength(100);
+ s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis));
s3ObjectMetadata.setContentEncoding(null);
when(s3FileSystem
.getAmazonS3Client()
@@ -404,6 +413,7 @@ public class S3FileSystemTest {
ImmutableList.of(
MatchResult.Metadata.builder()
.setSizeBytes(100)
+ .setLastModifiedMillis(lastModifiedMillis)
.setResourceId(path)
.setIsReadSeekEfficient(true)
.build())));
@@ -488,12 +498,14 @@ public class S3FileSystemTest {
firstMatch.setBucketName(path.getBucket());
firstMatch.setKey("foo/bar0baz");
firstMatch.setSize(100);
+ firstMatch.setLastModified(new Date(1540000000001L));
// Expected to not be returned; prefix matches, but substring after wildcard does not
S3ObjectSummary secondMatch = new S3ObjectSummary();
secondMatch.setBucketName(path.getBucket());
secondMatch.setKey("foo/bar1qux");
secondMatch.setSize(200);
+ secondMatch.setLastModified(new Date(1540000000002L));
// Expected first request returns continuation token
ListObjectsV2Result firstResult = new ListObjectsV2Result();
@@ -517,6 +529,7 @@ public class S3FileSystemTest {
thirdMatch.setBucketName(path.getBucket());
thirdMatch.setKey("foo/bar2baz");
thirdMatch.setSize(300);
+ thirdMatch.setLastModified(new Date(1540000000003L));
// Expected second request returns third prefix match and no continuation token
ListObjectsV2Result secondResult = new ListObjectsV2Result();
@@ -542,6 +555,7 @@ public class S3FileSystemTest {
S3ResourceId.fromComponents(
firstMatch.getBucketName(), firstMatch.getKey()))
.setSizeBytes(firstMatch.getSize())
+ .setLastModifiedMillis(firstMatch.getLastModified().getTime())
.build(),
MatchResult.Metadata.builder()
.setIsReadSeekEfficient(true)
@@ -549,6 +563,7 @@ public class S3FileSystemTest {
S3ResourceId.fromComponents(
thirdMatch.getBucketName(), thirdMatch.getKey()))
.setSizeBytes(thirdMatch.getSize())
+ .setLastModifiedMillis(thirdMatch.getLastModified().getTime())
.build())));
}
@@ -569,12 +584,14 @@ public class S3FileSystemTest {
firstMatch.setBucketName(path.getBucket());
firstMatch.setKey("foo/bar\\baz0");
firstMatch.setSize(100);
+ firstMatch.setLastModified(new Date(1540000000001L));
// Expected to not be returned; prefix matches, but substring after wildcard does not
S3ObjectSummary secondMatch = new S3ObjectSummary();
secondMatch.setBucketName(path.getBucket());
secondMatch.setKey("foo/bar/baz1");
secondMatch.setSize(200);
+ secondMatch.setLastModified(new Date(1540000000002L));
// Expected first request returns continuation token
ListObjectsV2Result result = new ListObjectsV2Result();
@@ -600,6 +617,7 @@ public class S3FileSystemTest {
S3ResourceId.fromComponents(
firstMatch.getBucketName(), firstMatch.getKey()))
.setSizeBytes(firstMatch.getSize())
+ .setLastModifiedMillis(firstMatch.getLastModified().getTime())
.build())));
}
@@ -636,6 +654,7 @@ public class S3FileSystemTest {
S3ResourceId pathExist = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
s3ObjectMetadata.setContentLength(100);
+ s3ObjectMetadata.setLastModified(new Date(1540000000000L));
s3ObjectMetadata.setContentEncoding("not-gzip");
when(s3FileSystem
.getAmazonS3Client()
@@ -651,6 +670,7 @@ public class S3FileSystemTest {
foundListObject.setBucketName(pathGlob.getBucket());
foundListObject.setKey("path/part-0");
foundListObject.setSize(200);
+ foundListObject.setLastModified(new Date(1541000000000L));
ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
listObjectsResult.setNextContinuationToken(null);
@@ -679,9 +699,10 @@ public class S3FileSystemTest {
MatchResultMatcher.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException()),
MatchResultMatcher.create(
MatchResult.Status.ERROR, new IOException(forbiddenException)),
- MatchResultMatcher.create(100, pathExist, true),
+ MatchResultMatcher.create(100, 1540000000000L, pathExist, true),
MatchResultMatcher.create(
200,
+ 1541000000000L,
S3ResourceId.fromComponents(pathGlob.getBucket(), foundListObject.getKey()),
true)));
}
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 0bd556f..4eefc86 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -102,6 +102,7 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
.setResourceId(new HadoopResourceId(uri))
.setIsReadSeekEfficient(true)
.setSizeBytes(fileStatus.getLen())
+ .setLastModifiedMillis(fileStatus.getModificationTime())
.build());
}
}
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 782e75c..acb550f 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -158,6 +158,7 @@ public class HadoopFileSystemTest {
.setResourceId(testPath("testFileB"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("testFileB"))
.build()))));
}
@@ -188,11 +189,13 @@ public class HadoopFileSystemTest {
.setResourceId(testPath("testFileAA"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("testFileAA"))
.build(),
Metadata.builder()
.setResourceId(testPath("testFileA"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("testFileA"))
.build()));
}
@@ -223,6 +226,7 @@ public class HadoopFileSystemTest {
.setResourceId(testPath("testFileAA"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("testFileAA"))
.build())),
MatchResult.create(Status.NOT_FOUND, ImmutableList.of()),
MatchResult.create(
@@ -232,6 +236,7 @@ public class HadoopFileSystemTest {
.setResourceId(testPath("testFileBB"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataBB".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("testFileBB"))
.build())));
assertThat(matchResults, equalTo(expected));
}
@@ -258,11 +263,13 @@ public class HadoopFileSystemTest {
.setResourceId(testPath("renameFileA"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("renameFileA"))
.build(),
Metadata.builder()
.setResourceId(testPath("renameFileB"))
.setIsReadSeekEfficient(true)
.setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length)
+ .setLastModifiedMillis(lastModified("renameFileB"))
.build()));
// ensure files exist
@@ -377,6 +384,13 @@ public class HadoopFileSystemTest {
}
}
+ private long lastModified(String relativePath) throws Exception {
+ return fileSystem
+ .fileSystem
+ .getFileStatus(testPath(relativePath).toPath())
+ .getModificationTime();
+ }
+
private HadoopResourceId testPath(String relativePath) {
return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
}