You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/01/03 21:31:45 UTC
[13/50] [abbrv] hadoop git commit: HADOOP-13282. S3 blob etags to be
made visible in S3A status/getFileChecksum() calls. Contributed by Steve
Loughran
HADOOP-13282. S3 blob etags to be made visible in S3A status/getFileChecksum() calls.
Contributed by Steve Loughran
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8ff0cc3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8ff0cc3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8ff0cc3
Branch: refs/heads/YARN-1011
Commit: c8ff0cc304f07bf793192291e0611b2fb4bcc4e3
Parents: ef450df
Author: Steve Loughran <st...@apache.org>
Authored: Thu Dec 21 14:58:58 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Dec 21 14:58:58 2017 +0000
----------------------------------------------------------------------
.../apache/hadoop/fs/store/EtagChecksum.java | 90 +++++++++++++
.../apache/hadoop/fs/store/package-info.java | 28 ++++
.../hadoop/fs/store/TestEtagChecksum.java | 85 ++++++++++++
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 40 ++++++
.../hadoop/fs/s3a/ITestS3AMiscOperations.java | 133 ++++++++++++++++---
5 files changed, 359 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
new file mode 100644
index 0000000..cc29f1b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.fs.FileChecksum;
+
+/**
+ * An etag as a checksum.
+ * Consider these suitable for checking if an object has changed, but
+ * not suitable for comparing two different objects for equivalence,
+ * especially between object stores.
+ */
+public class EtagChecksum extends FileChecksum {
+
+ /** The algorithm name: {@value}. */
+ private static final String ETAG = "etag";
+
+ /**
+ * Etag string.
+ */
+ private String eTag = "";
+
+ /**
+ * Create with an empty etag.
+ */
+ public EtagChecksum() {
+ }
+
+ /**
+ * Create with a string etag.
+ * @param eTag etag
+ */
+ public EtagChecksum(String eTag) {
+ this.eTag = eTag;
+ }
+
+ @Override
+ public String getAlgorithmName() {
+ return ETAG;
+ }
+
+ @Override
+ public int getLength() {
+ return eTag.getBytes(StandardCharsets.UTF_8).length;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return eTag != null
+ ? eTag.getBytes(StandardCharsets.UTF_8)
+ : new byte[0];
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(eTag != null ? eTag : "");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ eTag = in.readUTF();
+ }
+
+ @Override
+ public String toString() {
+ return "etag: \"" + eTag + '"';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
new file mode 100644
index 0000000..ebe1db4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package is for classes to be shared across object stores; for internal
+ * use within the hadoop-* modules only. No stability guarantees.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.store;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
new file mode 100644
index 0000000..ef9613f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * Unit test of etag operations.
+ */
+public class TestEtagChecksum extends Assert {
+
+ private final EtagChecksum empty1 = tag("");
+ private final EtagChecksum empty2 = tag("");
+ private final EtagChecksum valid1 = tag("valid");
+ private final EtagChecksum valid2 = tag("valid");
+
+ @Test
+ public void testEmptyTagsEqual() {
+ assertEquals(empty1, empty2);
+ }
+
+ @Test
+ public void testEmptyTagRoundTrip() throws Throwable {
+ assertEquals(empty1, roundTrip(empty1));
+ }
+
+ @Test
+ public void testValidTagsEqual() {
+ assertEquals(valid1, valid2);
+ }
+
+ @Test
+ public void testValidTagRoundTrip() throws Throwable {
+ assertEquals(valid1, roundTrip(valid1));
+ }
+
+ @Test
+ public void testValidAndEmptyTagsDontMatch() {
+ assertNotEquals(valid1, empty1);
+ assertNotEquals(valid1, tag("other valid one"));
+ }
+
+ @Test
+ public void testDifferentTagsDontMatch() {
+ assertNotEquals(valid1, tag("other valid one"));
+ }
+
+ private EtagChecksum tag(String t) {
+ return new EtagChecksum(t);
+ }
+
+ private EtagChecksum roundTrip(EtagChecksum tag) throws IOException {
+ try (DataOutputBuffer dob = new DataOutputBuffer();
+ DataInputBuffer dib = new DataInputBuffer()) {
+ tag.write(dob);
+ dib.reset(dob.getData(), dob.getLength());
+ EtagChecksum t2 = new EtagChecksum();
+ t2.readFields(dib);
+ return t2;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index f461c9e..a8147ed 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
@@ -539,6 +540,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
/**
+ * Get the encryption algorithm of this endpoint.
+ * @return the encryption algorithm.
+ */
+ public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+ return serverSideEncryptionAlgorithm;
+ }
+
+ /**
* Demand create the directory allocator, then create a temporary file.
* {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
* @param pathStr prefix for the temporary file
@@ -1069,6 +1078,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
+ @Retries.RetryRaw
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
return getObjectMetadata(pathToKey(path));
}
@@ -2935,6 +2945,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
/**
+ * Get the etag of a object at the path via HEAD request and return it
+ * as a checksum object. This has the whatever guarantees about equivalence
+ * the S3 implementation offers.
+ * <ol>
+ * <li>If a tag has not changed, consider the object unchanged.</li>
+ * <li>Two tags being different does not imply the data is different.</li>
+ * </ol>
+ * Different S3 implementations may offer different guarantees.
+ * @param f The file path
+ * @param length The length of the file range for checksum calculation
+ * @return The EtagChecksum or null if checksums are not supported.
+ * @throws IOException IO failure
+ * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
+ */
+
+ public EtagChecksum getFileChecksum(Path f, final long length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0);
+ Path path = qualify(f);
+ LOG.debug("getFileChecksum({})", path);
+ return once("getFileChecksum", path.toString(),
+ () -> {
+ // this always does a full HEAD to the object
+ ObjectMetadata headers = getObjectMetadata(path);
+ String eTag = headers.getETag();
+ return eTag != null ? new EtagChecksum(eTag) : null;
+ });
+ }
+
+ /**
* {@inheritDoc}.
*
* This implementation is optimized for S3, which can do a bulk listing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 869d64c..ddf2529 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -18,21 +18,24 @@
package org.apache.hadoop.fs.s3a;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
+import org.junit.Assume;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.store.EtagChecksum;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
/**
* Tests of the S3A FileSystem which don't have a specific home and can share
@@ -40,6 +43,8 @@ import java.util.concurrent.Callable;
*/
public class ITestS3AMiscOperations extends AbstractS3ATestBase {
+ private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
+
@Test
public void testCreateNonRecursiveSuccess() throws IOException {
Path shouldWork = path("nonrecursivenode");
@@ -58,7 +63,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
@Test(expected = FileAlreadyExistsException.class)
public void testCreateNonRecursiveParentIsFile() throws IOException {
Path parent = path("/file.txt");
- ContractTestUtils.touch(getFileSystem(), parent);
+ touch(getFileSystem(), parent);
createNonRecursive(new Path(parent, "fail"));
}
@@ -73,12 +78,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
new ByteArrayInputStream("PUT".getBytes()),
metadata);
LambdaTestUtils.intercept(IllegalStateException.class,
- new Callable<PutObjectResult>() {
- @Override
- public PutObjectResult call() throws Exception {
- return fs.putObjectDirect(put);
- }
- });
+ () -> fs.putObjectDirect(put));
assertPathDoesNotExist("put object was created", path);
}
@@ -87,4 +87,103 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
(short) 3, (short) 4096,
null);
}
+
+ /**
+ * Touch a path, return the full path.
+ * @param name relative name
+ * @return the path
+ * @throws IOException IO failure
+ */
+ Path touchFile(String name) throws IOException {
+ Path path = path(name);
+ touch(getFileSystem(), path);
+ return path;
+ }
+
+ /**
+ * Create a file with the data, return the path.
+ * @param name relative name
+ * @param data data to write
+ * @return the path
+ * @throws IOException IO failure
+ */
+ Path mkFile(String name, byte[] data) throws IOException {
+ final Path f = path(name);
+ createFile(getFileSystem(), f, true, data);
+ return f;
+ }
+
+ /**
+ * The assumption here is that 0-byte files uploaded in a single PUT
+ * always have the same checksum, including stores with encryption.
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testEmptyFileChecksums() throws Throwable {
+ final S3AFileSystem fs = getFileSystem();
+ Path file1 = touchFile("file1");
+ EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
+ LOG.info("Checksum for {}: {}", file1, checksum1);
+ assertNotNull("file 1 checksum", checksum1);
+ assertNotEquals("file 1 checksum", 0, checksum1.getLength());
+ assertEquals("checksums", checksum1,
+ fs.getFileChecksum(touchFile("file2"), 0));
+ }
+
+ /**
+ * Verify that different file contents have different
+ * checksums, and that that they aren't the same as the empty file.
+ * @throws Throwable failure
+ */
+ @Test
+ public void testNonEmptyFileChecksums() throws Throwable {
+ final S3AFileSystem fs = getFileSystem();
+ final Path file3 = mkFile("file3", HELLO);
+ final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0);
+ assertNotNull("file 3 checksum", checksum1);
+ final Path file4 = touchFile("file4");
+ final EtagChecksum checksum2 = fs.getFileChecksum(file4, 0);
+ assertNotEquals("checksums", checksum1, checksum2);
+ // overwrite
+ createFile(fs, file4, true,
+ "hello, world".getBytes(StandardCharsets.UTF_8));
+ assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
+ }
+
+ /**
+ * Verify that on an unencrypted store, the checksum of two non-empty
+ * (single PUT) files is the same if the data is the same.
+ * This will fail if the bucket has S3 default encryption enabled.
+ * @throws Throwable failure
+ */
+ @Test
+ public void testNonEmptyFileChecksumsUnencrypted() throws Throwable {
+ Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE));
+ final S3AFileSystem fs = getFileSystem();
+ final EtagChecksum checksum1 =
+ fs.getFileChecksum(mkFile("file5", HELLO), 0);
+ assertNotNull("file 3 checksum", checksum1);
+ assertEquals("checksums", checksum1,
+ fs.getFileChecksum(mkFile("file6", HELLO), 0));
+ }
+
+ private S3AEncryptionMethods encryptionAlgorithm() {
+ return getFileSystem().getServerSideEncryptionAlgorithm();
+ }
+
+ @Test
+ public void testNegativeLength() throws Throwable {
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ () -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1));
+ }
+
+ @Test
+ public void testLengthPastEOF() throws Throwable {
+ final S3AFileSystem fs = getFileSystem();
+ Path f = mkFile("file5", HELLO);
+ assertEquals(
+ fs.getFileChecksum(f, HELLO.length),
+ fs.getFileChecksum(f, HELLO.length * 2));
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org