You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/05 07:25:57 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r778592843



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -132,6 +145,11 @@ public void write(int b) throws IOException {
     }
 
     stream.write(b);
+    if (isEtagCheckEnabled) {
+      byte byteValue = ((Integer) b).byteValue();
+      currentPartMessageDigest.update(byteValue);
+      completeMessageDigest.update(byteValue);
+    }

Review comment:
       nit: newline after if block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = Lists.newArrayList();

Review comment:
       I think we can avoid changing this variable name because the class name is already self-explanatory

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       since this is only for s3, I think it makes more sense to use `s3` prefix. Also the convention of naming for this type of config is `xxx.enabled` (see example like `SPARK_WRITE_PARTITIONED_FANOUT_ENABLED`)

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +204,12 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    stagingFilesWithETags.add(new FileWithEtag(currentStagingFile, currentPartMessageDigest));

Review comment:
       nit: newline after try block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private static final String digestAlgorithm = "MD5";

Review comment:
       nit: static variable should be above all other non-static ones

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       S3 has built in checksum verification using request header `Content-MD5`. I think we don't need to check the response at client side.
   
   See https://aws.amazon.com/premiumsupport/knowledge-center/data-integrity-s3/ for more details

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -153,6 +171,10 @@ public void write(byte[] b, int off, int len) throws IOException {
       int writeSize = multiPartSize - (int) stream.getCount();
 
       stream.write(b, relativeOffset, writeSize);
+      if (isEtagCheckEnabled) {
+        currentPartMessageDigest.update(b, relativeOffset, writeSize);
+        completeMessageDigest.update(b, relativeOffset, writeSize);
+      }

Review comment:
       nit: newline after if block
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org