You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/22 13:58:00 UTC

[jira] [Work logged] (BEAM-5196) Add MD5 consistency check on S3 uploads (writes)

     [ https://issues.apache.org/jira/browse/BEAM-5196?focusedWorklogId=136953&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136953 ]

ASF GitHub Bot logged work on BEAM-5196:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Aug/18 13:57
            Start Date: 22/Aug/18 13:57
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6232: [BEAM-5196]  Add MD5 consistency check on S3 uploads (writes)
URL: https://github.com/apache/beam/pull/6232
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/amazon-web-services/build.gradle b/sdks/java/io/amazon-web-services/build.gradle
index cbcc97df0ce..973d115dbf2 100644
--- a/sdks/java/io/amazon-web-services/build.gradle
+++ b/sdks/java/io/amazon-web-services/build.gradle
@@ -42,4 +42,5 @@ dependencies {
   shadowTest library.java.mockito_core
   shadowTest library.java.junit
   shadowTest library.java.slf4j_jdk14
+  testCompile "io.findify:s3mock_2.11:0.2.4"
 }
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
index 6c1fc7188f8..c061adc527b 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
@@ -29,12 +29,15 @@
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.util.Base64;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
@@ -53,6 +56,7 @@
   // AWS S3 parts are 1-indexed, not zero-indexed.
   private int partNumber = 1;
   private boolean open = true;
+  private final MessageDigest md5 = md5();
 
   S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String contentType, S3Options options)
       throws IOException {
@@ -95,6 +99,14 @@
     uploadId = result.getUploadId();
   }
 
+  private static MessageDigest md5() {
+    try {
+      return MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
   @Override
   public int write(ByteBuffer sourceBuffer) throws IOException {
     if (!isOpen()) {
@@ -109,6 +121,7 @@ public int write(ByteBuffer sourceBuffer) throws IOException {
       byte[] copyBuffer = new byte[bytesWritten];
       sourceBuffer.get(copyBuffer);
       uploadBuffer.put(copyBuffer);
+      md5.update(copyBuffer);
 
       if (!uploadBuffer.hasRemaining() || sourceBuffer.hasRemaining()) {
         flush();
@@ -129,6 +142,7 @@ private void flush() throws IOException {
             .withUploadId(uploadId)
             .withPartNumber(partNumber++)
             .withPartSize(uploadBuffer.remaining())
+            .withMD5Digest(Base64.encodeAsString(md5.digest()))
             .withInputStream(inputStream);
     request.setSSECustomerKey(options.getSSECustomerKey());
 
@@ -139,6 +153,7 @@ private void flush() throws IOException {
       throw new IOException(e);
     }
     uploadBuffer.clear();
+    md5.reset();
     eTags.add(result.getPartETag());
   }
 
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index 63f69b55c0d..0abf2170a36 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -22,9 +22,11 @@
 import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.getSSECustomerKeyMd5;
 import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3Options;
 import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSECustomerKey;
+import static org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions.builder;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyObject;
@@ -35,6 +37,12 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import akka.http.scaladsl.Http;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
@@ -50,12 +58,18 @@
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.collect.ImmutableList;
+import io.findify.s3mock.S3Mock;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -64,6 +78,29 @@
 /** Test case for {@link S3FileSystem}. */
 @RunWith(JUnit4.class)
 public class S3FileSystemTest {
+  private static S3Mock api;
+  private static AmazonS3 client;
+
+  @BeforeClass
+  public static void beforeClass() {
+    api = new S3Mock.Builder().withInMemoryBackend().build();
+    Http.ServerBinding binding = api.start();
+
+    EndpointConfiguration endpoint =
+        new EndpointConfiguration(
+            "http://localhost:" + binding.localAddress().getPort(), "us-west-2");
+    client =
+        AmazonS3ClientBuilder.standard()
+            .withPathStyleAccessEnabled(true)
+            .withEndpointConfiguration(endpoint)
+            .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
+            .build();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    api.stop();
+  }
 
   @Test
   public void testGlobTranslation() {
@@ -638,6 +675,34 @@ public void matchVariousInvokeThreadPool() throws IOException {
                 true)));
   }
 
+  @Test
+  public void testWriteAndRead() throws IOException {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options(), client);
+
+    client.createBucket("testbucket");
+
+    byte[] writtenArray = new byte[] {0};
+    ByteBuffer bb = ByteBuffer.allocate(writtenArray.length);
+    bb.put(writtenArray);
+
+    //First create an object and write data to it
+    S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt");
+    WritableByteChannel writableByteChannel =
+        s3FileSystem.create(path, builder().setMimeType("application/text").build());
+    writableByteChannel.write(bb);
+    writableByteChannel.close();
+
+    //Now read the same object
+    ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length);
+    ReadableByteChannel open = s3FileSystem.open(path);
+    open.read(bb2);
+
+    //And compare the content with the one that was written
+    byte[] readArray = bb2.array();
+    assertArrayEquals(readArray, writtenArray);
+    open.close();
+  }
+
   /** A mockito argument matcher to implement equality on GetObjectMetadataRequest. */
   private static class GetObjectMetadataRequestMatcher
       extends ArgumentMatcher<GetObjectMetadataRequest> {
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
index 52fc1af88bd..fdd6733411b 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
@@ -66,8 +66,12 @@ static S3Options s3OptionsWithMultipleSSEOptions() {
   }
 
   static S3FileSystem buildMockedS3FileSystem(S3Options options) {
+    return buildMockedS3FileSystem(options, Mockito.mock(AmazonS3.class));
+  }
+
+  static S3FileSystem buildMockedS3FileSystem(S3Options options, AmazonS3 client) {
     S3FileSystem s3FileSystem = new S3FileSystem(options);
-    s3FileSystem.setAmazonS3Client(Mockito.mock(AmazonS3.class));
+    s3FileSystem.setAmazonS3Client(client);
     return s3FileSystem;
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 136953)
    Time Spent: 20m  (was: 10m)

> Add MD5 consistency check on S3 uploads (writes)
> ------------------------------------------------
>
>                 Key: BEAM-5196
>                 URL: https://issues.apache.org/jira/browse/BEAM-5196
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws
>    Affects Versions: 2.7.0
>            Reporter: Ismaël Mejía
>            Assignee: Leen Toelen
>            Priority: Minor
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)