You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/28 01:43:50 UTC

[GitHub] [iceberg] SinghAsDev opened a new pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

SinghAsDev opened a new pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813


   This PR adds capability to perform checksum validations on data uploads using S3 eTags. This should help in capturing hardware issues like corrupt disks that may affect the staged files generated while uploading data to S3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788269610



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +254,49 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
               .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .contentLength(f.file().length());

Review comment:
       This leads to LocalVariableName checkstyle failure, so I will have to rename `f` to meaningful name anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789055918



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +78,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private MessageDigest currentPartMessageDigest;

Review comment:
       Thanks @danielcweeks for the review. Updated the diff.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788357730



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +78,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private MessageDigest currentPartMessageDigest;

Review comment:
       If we do want to calculate the full stream, it looks like it may be possible to wrap it twice: once for the individual part streams where we reset the digest every time, and a second wrapped stream where we reuse the digest across each file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789156713



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       We need both to handle case where written bytes is larger than multipart size but less than multipart threshold. In that case we will have two files, and so two part message digests, whereas we need the whole message digest in put request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789840483



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       That's fine with me. Trying to understand the rational though. Is it to avoid merge conflicts for folks that may be doing a cherry-pick?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789195309



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -172,15 +187,23 @@ public void write(byte[] b, int off, int len) throws IOException {
 
   private void newStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      closeStream();
     }
 
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
 
-    stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
+    stagingFiles.add(new FileAndDigest(currentStagingFile));
+
+    if (isEtagCheckEnabled) {
+      currentPartMessageDigest.reset();

Review comment:
       @danielcweeks @rdblue either we can create a new MessageDigest for each file and use MessageDigest in `FileAndDigest` or we can use bytes. Former is likely more readable and later is likely more memory efficient. I feel memory efficiency here is not the biggest concern, so I am happy to go back to what I had originally. However, let me know what you folks think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on pull request #3813: AWS: Support checksum validation with S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#issuecomment-1021596588


   Thanks @SinghAsDev !!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r778592843



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -132,6 +145,11 @@ public void write(int b) throws IOException {
     }
 
     stream.write(b);
+    if (isEtagCheckEnabled) {
+      byte byteValue = ((Integer) b).byteValue();
+      currentPartMessageDigest.update(byteValue);
+      completeMessageDigest.update(byteValue);
+    }

Review comment:
       nit: newline after if block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = Lists.newArrayList();

Review comment:
       I think we can avoid changing this variable name because the class name is already self-explanatory

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       since this is only for s3, I think it makes more sense to use `s3` prefix. Also the convention of naming for this type of config is `xxx.enabled` (see example like `SPARK_WRITE_PARTITIONED_FANOUT_ENABLED`)

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +204,12 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    stagingFilesWithETags.add(new FileWithEtag(currentStagingFile, currentPartMessageDigest));

Review comment:
       nit: newline after try block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +77,17 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileWithEtag> stagingFilesWithETags = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private static final String digestAlgorithm = "MD5";

Review comment:
       nit: static variable should be above all other non-static ones

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       S3 has built in checksum verification using request header `Content-MD5`. I think we don't need to check the response at client side.
   
   See https://aws.amazon.com/premiumsupport/knowledge-center/data-integrity-s3/ for more details

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -153,6 +171,10 @@ public void write(byte[] b, int off, int len) throws IOException {
       int writeSize = multiPartSize - (int) stream.getCount();
 
       stream.write(b, relativeOffset, writeSize);
+      if (isEtagCheckEnabled) {
+        currentPartMessageDigest.update(b, relativeOffset, writeSize);
+        completeMessageDigest.update(b, relativeOffset, writeSize);
+      }

Review comment:
       nit: newline after if block
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#issuecomment-1002179576


   @jackye1995 mind reviewing this when you get a chance. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r777782594



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       Would it make more sense to use `client.etag.enable-check` or something with `etag` as the second part?
   
   Some of the other ones seem to follow a pattern of "client".<resource>.<configuration>, which I think makes more sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r784281870



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       > With that and the fact that current approach allows to add a better error message, I would propose doing explicit checks here.
   
   Consulted with the S3 team related to this, definitely doing it on server side is preferred as they have dedicated hardware for this operation, comparing to a client running checksum in any environment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789143594



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {

Review comment:
       Could we make this extend File or make it more generic (FileReference) because we use this for both when we have a digest and when we don't.
   
   This could simplify the etags checks on upload to just be:
   
   ```
   If (file.hasDigest()) {
     requestBuilder.contentMD5(...)
   }
   ```
   
   It just feels like we're using the etag flag to drive a lot of logic in different places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks merged pull request #3813: AWS: Support checksum validation with S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks merged pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789152670



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;

Review comment:
       nit: old name was not updated

##########
File path: aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
##########
@@ -77,14 +86,15 @@
   private final String newTmpDirectory = "/tmp/newStagingDirectory";
 
   private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
-      AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
+      AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(FIVE_MBS),

Review comment:
       this change does not seem necessary

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        logS3RequestAndThrow(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private void closeStream() throws IOException {
+    if (isEtagCheckEnabled) {
+      if (stagingFiles.size() > 0) {
+        stagingFiles.get(stagingFiles.size() - 1).setDigest(currentPartMessageDigest.digest());
+      }
+    }
+    stream.close();
+  }
+
+  private static void logS3RequestAndThrow(UncheckedIOException uncheckedIOException, String requestString) {
+    LOG.error("S3 Request Failure: {}", requestString);
+    throw uncheckedIOException;

Review comment:
       +1, logging full request also has security implications for many applications and should be avoided.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -373,4 +384,12 @@ public String dynamoDbTableName() {
   public void setDynamoDbTableName(String name) {
     this.dynamoDbTableName = name;
   }
+
+  public boolean isS3ChecksumEnabled() {
+    return this.isS3ChecksumEnabled;
+  }
+
+  public void setS3ChecksumEnabled(boolean eTagCheckEnabled) {

Review comment:
       nit: `eTagCheckEnabled` was not updated from the old name, maybe just `enabled`?

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       I think we can name this variable `f` to avoid name changing in subsequent lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789207426



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,32 +241,43 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (fileAndDigest.hasDigest()) {
+            requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  throw new UncheckedIOException(

Review comment:
       I am thinking stripping of message from original `UncheckedIOException` is probably OK as I did not see any helpful info in there, but then it's not guaranteed to remain the same. The security concern expressed by @jackye1995 on logging entire request (which I believe is fine as sensitive parts of requests are already redacted) is also an open conversation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r779102577



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       Yea, I was also initially doing that. However, I could not find a way to add a reliable test. Tests would succeed with wrong md5 checksums added to request. Likely due to s3 mock. With that and the fact that current approach allows to add a better error message, I would propose doing explicit checks here. However, if you have a strong preference on this, I can modify. Let me know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791053575



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       Actually, just realized we can just turn off the digest updates in `DigestOutputStream` after we have crossed multi part threshold size. Will add that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#issuecomment-1015617355


   @danielcweeks, can you take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788266424



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +208,13 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);

Review comment:
       Yea, since there are public methods that call `newStream` and only throw IOException, I wanted to avoid changing signature.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +421,22 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileWithEtag {
+    private final File file;
+    private final MessageDigest eTag;

Review comment:
       I think we can do that, it will add the logic a tad bit more complicated as we close stream at multiple places. Will update it.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +254,49 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
               .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .contentLength(f.file().length());

Review comment:
       Good point, updating.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +356,32 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        checkProtocolException(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private static void checkProtocolException(UncheckedIOException uncheckedIOException, String requestString) {
+    if (uncheckedIOException.getCause() instanceof ProtocolException) {
+      LOG.error("S3 Request Failure: {}", requestString);
+      throw uncheckedIOException;

Review comment:
       I initially wanted to log only if it is a `ProtocolException`, however now I am thinking it is better to log irrespective. Updating to that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788252447



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +254,49 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
               .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .contentLength(f.file().length());

Review comment:
       Within this block, there would be a lot fewer changes if you left `f` unchanged: `.foreach(fileAndDigest -> { File f = fileAndDigest.file(); ... })`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789055918



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +78,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private MessageDigest currentPartMessageDigest;

Review comment:
       Thanks @danielcweeks for the review. We don't do full checksum if we are doing multi-part upload, we only do full checksum if it is a single put request. If I am understanding correctly wrapping the stream with `DigestOutputStream` will make us handle counts explicitly, which will beat the purpose of this suggestion to simplify the changes. Please correct me if I am missing something here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788256849



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +356,32 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        checkProtocolException(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private static void checkProtocolException(UncheckedIOException uncheckedIOException, String requestString) {
+    if (uncheckedIOException.getCause() instanceof ProtocolException) {
+      LOG.error("S3 Request Failure: {}", requestString);
+      throw uncheckedIOException;

Review comment:
       Oh this is a mistake. Thanks for catching this @rdblue . Will fix it shortly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789820847



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,32 +241,43 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (fileAndDigest.hasDigest()) {
+            requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  throw new UncheckedIOException(

Review comment:
       What was the security concern? I don't think that this should be catching the exception and editing it, unless there's a strong reason. Since `UncheckedIOException` is a wrapper produced by other Iceberg code, it's very unlikely to add information that is a security concern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789822658



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {
+    private final File file;
+    private byte[] digest;
+    private boolean isDigestSet = false;
+
+    FileAndDigest(File file) {
+      this.file = file;
+    }
+
+    File file() {
+      return file;
+    }
+
+    void setDigest(byte[] digest) {

Review comment:
       In that case, I think it is a reason to go back to storing the digest. I didn't realize that the digest was getting updated after creating this. Seems like there should be a cleaner way where we update the digest and create an immutable object when the file is closed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r790319167



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       To avoid conflicts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789133403



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {
+    private final File file;
+    private byte[] digest;
+    private boolean isDigestSet = false;
+
+    FileAndDigest(File file) {
+      this.file = file;
+    }
+
+    File file() {
+      return file;
+    }
+
+    void setDigest(byte[] digest) {

Review comment:
       Does this need to be mutable? Why not just pass in `digest` when creating? I thought that was possible before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789850804



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       Yea, I was thinking about this too. We can't do that if we were to use digest stream, I will have to revert to my previous implementation of maintaining and updating digest in writes though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r785240905



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       @jackye1995 I think this is good now. I verified that wrong md5 is causing protocol exceptions from s3 server side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791053575



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       Actually, just realized we can just turn off the digest updates in `DigestOutputStream` after we have crossed multi part threshold size. Will add that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r790324462



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {
+    private final File file;
+    private byte[] digest;
+    private boolean isDigestSet = false;
+
+    FileAndDigest(File file) {
+      this.file = file;
+    }
+
+    File file() {
+      return file;
+    }
+
+    void setDigest(byte[] digest) {

Review comment:
       +1 to Ryan's comment.  Keep a digest per file.  It's a trivial amount of memory and better encapsulated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791974185



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -110,6 +118,12 @@
     multiPartSize = awsProperties.s3FileIoMultiPartSize();
     multiPartThresholdSize =  (int) (multiPartSize * awsProperties.s3FileIOMultipartThresholdFactor());
     stagingDirectory = new File(awsProperties.s3fileIoStagingDirectory());
+    isChecksumEnabled = awsProperties.isS3ChecksumEnabled();
+    try {
+      completeMessageDigest = isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException("Failed to create message digest needed for s3 checksum checks.", e);

Review comment:
       Nit: no need for punctuation at the end of error messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wizardxz commented on pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
wizardxz commented on pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#issuecomment-1005107531


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r780534493



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       I'm a big fan of `s3` as the prefix, and then `s3.checksum.enabled` would be my preferred choice.
   
   This really explains what the intent is, vs etag-check more explains how it works and isn't as informative for users who don't know the underlying details of S3.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r784319125



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       Thanks @jackye1995 . Let me update the diff in that case. Will try to do so this week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789191666



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +208,13 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);

Review comment:
       Sure, changed this to RuntimeException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788251729



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +421,22 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileWithEtag {
+    private final File file;
+    private final MessageDigest eTag;

Review comment:
       Why keep the `MessageDigest` around when the only use of it is to immediately call `digest()` on line 269? Seems like this should probably track the digest bytes with the file instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789144125



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -172,15 +187,23 @@ public void write(byte[] b, int off, int len) throws IOException {
 
   private void newStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      closeStream();
     }
 
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
 
-    stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
+    stagingFiles.add(new FileAndDigest(currentStagingFile));
+
+    if (isEtagCheckEnabled) {
+      currentPartMessageDigest.reset();

Review comment:
       This overlaps with Ryan's comment below, but if we just create a message digest and associate it here, we don't need to update it in the close method below.
   
   Something like:
   
   ```java
   
   stagingFiles.add(new FileAndDigest(currentStagingFile, newMessageDigest));
   ```
   
   It means that we have to create a new digest object per file but also means they're standalone and don't require as much coordination.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791969876



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +403,26 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {
+    private final File file;
+    private final MessageDigest digest;
+
+    FileAndDigest(File file, MessageDigest digest) {
+      this.file = file;
+      this.digest = digest;
+    }
+
+    File file() {
+      return file;
+    }
+
+    byte[] getDigest() {

Review comment:
       Iceberg doesn't use `get` because it isn't useful. We either omit it or use a more specific verb. This should just be `digest` since it is a getter, like `file`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r778596602



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       The name "etag check" feels a bit not informative to me, people without context has no idea what this is doing. Maybe something like `s3.integrity-check.enabled` or `s3.checksum.enabled` sounds better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r785090790



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       SGTM, updated to `s3.checksum.enabled`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788250013



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +208,13 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);

Review comment:
       Why does this throw `IOException` with no context? I think there should be context at a minimum. I also suspect that this is throw `IOException` because that's an allowed checked exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789168709



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        logS3RequestAndThrow(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private void closeStream() throws IOException {
+    if (isEtagCheckEnabled) {
+      if (stagingFiles.size() > 0) {
+        stagingFiles.get(stagingFiles.size() - 1).setDigest(currentPartMessageDigest.digest());
+      }
+    }
+    stream.close();
+  }
+
+  private static void logS3RequestAndThrow(UncheckedIOException uncheckedIOException, String requestString) {
+    LOG.error("S3 Request Failure: {}", requestString);
+    throw uncheckedIOException;

Review comment:
       Makes sense, I can let the callers through the exceptions directly. 
   
   @jackye1995 for logging message, the sensitive part of request like `SSECustomerKey` is already redacted. I also feel just seeing random exception like ProtocolException without much information only makes thing confusing for end users.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -172,15 +187,23 @@ public void write(byte[] b, int off, int len) throws IOException {
 
   private void newStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      closeStream();
     }
 
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
 
-    stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
+    stagingFiles.add(new FileAndDigest(currentStagingFile));
+
+    if (isEtagCheckEnabled) {
+      currentPartMessageDigest.reset();

Review comment:
       This is what I had before, but changed that to instead keep bytes as per @rdblue comment.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {
+    private final File file;
+    private byte[] digest;
+    private boolean isDigestSet = false;
+
+    FileAndDigest(File file) {
+      this.file = file;
+    }
+
+    File file() {
+      return file;
+    }
+
+    void setDigest(byte[] digest) {

Review comment:
       It needs to be mutable if we are not passing MessageDigest. Earlier I had it mutable as I was passing MessageDigest, which I was able to update while bytes are written. File is created in newStream, but digest bytes are available later when stream is closed.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       using `f` would lead to `LocalVariableName` checkstyle failure. Previous discussion here https://github.com/apache/iceberg/pull/3813#discussion_r788252447




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789133077



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (isEtagCheckEnabled) {
+            requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  logS3RequestAndThrow(uncheckedIOException, uploadRequest.toString());
+                }
                 return CompletedPart.builder().eTag(response.eTag()).partNumber(uploadRequest.partNumber()).build();
               },
               executorService
           ).whenComplete((result, thrown) -> {
             try {
-              Files.deleteIfExists(f.toPath());
+              Files.deleteIfExists(file.toPath());
             } catch (IOException e) {
-              LOG.warn("Failed to delete staging file: {}", f, e);
+              LOG.warn("Failed to delete staging file: {}", file, e);
             }
 
             if (thrown != null) {
               LOG.error("Failed to upload part: {}", uploadRequest, thrown);
               abortUpload();
             }
           });
-
-          multiPartMap.put(f, future);
+          multiPartMap.put(file, future);

Review comment:
       Can you revert the unnecessary whitespace change here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789132412



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,7 +208,13 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isEtagCheckEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);

Review comment:
       We don't want to change the signature, but we also don't want to throw some exception just because it was convenient. The best thing to do is to wrap in RuntimeException, which avoids throwing a misleading exception class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789168709



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        logS3RequestAndThrow(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private void closeStream() throws IOException {
+    if (isEtagCheckEnabled) {
+      if (stagingFiles.size() > 0) {
+        stagingFiles.get(stagingFiles.size() - 1).setDigest(currentPartMessageDigest.digest());
+      }
+    }
+    stream.close();
+  }
+
+  private static void logS3RequestAndThrow(UncheckedIOException uncheckedIOException, String requestString) {
+    LOG.error("S3 Request Failure: {}", requestString);
+    throw uncheckedIOException;

Review comment:
       Makes sense, I can let the callers throw the exceptions directly. 
   
   @jackye1995 for logging message, the sensitive part of request like `SSECustomerKey` is already redacted. I also feel just seeing random exception like ProtocolException without much information only makes thing confusing for end users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r781457888



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       The integration tests in `/aws/src/integration` are added for such a purpose, I think we can leverage that.
   
   Unfortunately it is not run as a part of CI yet because it requires connection to an AWS account, we are working on getting one for Iceberg. If you do not have any AWS infrastructure that could be used to run it, I can run it for you after you publish you tests in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#issuecomment-1021438948


   LGTM, just waiting on checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791940872



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String S3_CHECKSUM_ENABLED = "s3.checksum.enabled";

Review comment:
       I know the conventions for property naming are a little mixed, but I believe this should be `s3.checksum-enabled`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788253472



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +356,32 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        checkProtocolException(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private static void checkProtocolException(UncheckedIOException uncheckedIOException, String requestString) {
+    if (uncheckedIOException.getCause() instanceof ProtocolException) {
+      LOG.error("S3 Request Failure: {}", requestString);
+      throw uncheckedIOException;

Review comment:
       This only throws protocol exceptions? What is being suppressed by this? Is there a guarantee that this is safe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789817461



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       Can we recalculate the digest if we switch to multipart? Or can we at least stop using the complete message digest after switching to multipart?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789838750



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,32 +241,43 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (fileAndDigest.hasDigest()) {
+            requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  throw new UncheckedIOException(

Review comment:
       Security concern is mostly about s3 request info, previous thread is here https://github.com/apache/iceberg/pull/3813#discussion_r789155286




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789134901



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -311,16 +344,39 @@ private void completeUploads() {
           .bucket(location.bucket())
           .key(location.key());
 
+      if (isEtagCheckEnabled) {
+        requestBuilder.contentMD5(BinaryUtils.toBase64(completeMessageDigest.digest()));
+      }
+
       S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
       S3RequestUtil.configurePermission(awsProperties, requestBuilder);
 
-      s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+      PutObjectRequest putObjectRequest = requestBuilder.build();
+      try {
+        s3.putObject(putObjectRequest, RequestBody.fromInputStream(contentStream, contentLength));
+      } catch (UncheckedIOException uncheckedIOException) {
+        logS3RequestAndThrow(uncheckedIOException, putObjectRequest.toString());
+      }
     } else {
       uploadParts();
       completeMultiPartUpload();
     }
   }
 
+  private void closeStream() throws IOException {
+    if (isEtagCheckEnabled) {
+      if (stagingFiles.size() > 0) {
+        stagingFiles.get(stagingFiles.size() - 1).setDigest(currentPartMessageDigest.digest());
+      }
+    }
+    stream.close();
+  }
+
+  private static void logS3RequestAndThrow(UncheckedIOException uncheckedIOException, String requestString) {
+    LOG.error("S3 Request Failure: {}", requestString);
+    throw uncheckedIOException;

Review comment:
       I don't think this method is needed. If the exception is going to be thrown, then it is the responsibility of whatever catches it to log it or handle it. This code doesn't know whether it can simply be handled, which is why it re-throws. So it should also not decide that this needs to be logged.
   
   Patterns like this usually lead to misleading errors or red herrings in logs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789182884



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -360,4 +416,28 @@ protected void finalize() throws Throwable {
       LOG.warn("Unclosed output stream created by:\n\t{}", trace);
     }
   }
+
+  private static class FileAndDigest {

Review comment:
       We can do that for multi part request, but for put request doing this is a bit awkward as either we will have to check for all files or just rely on the existing etag flag. I will update it to use hasDigest never the less for places where we can use it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r790323941



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       I don't think we want to try to recalculate if it requires rereading the file in order to calculate the checksum.  I still feel like pushing the per file digest into the FileAndDigest option is the best approach and keep the full file digest for the put object upload as well.  I think we're over optimizing by trying to stop the full file calculation and we could always log the full file MD5.  
   
   There's another option that might simplify things but would require a larger change involving uploads.  If we simply make the first part of the multipart threshold larger (multiplart+threshold) then each FileAndDigest would have a digest associated with it and could be used for the put object or put part uploads. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r780534493



##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -209,6 +209,12 @@
    */
   public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
 
+  /**
+   * Enables eTag checks for S3 PUT and MULTIPART upload requests.
+   */
+  public static final String CLIENT_ENABLE_ETAG_CHECK = "client.enable.etag-check";

Review comment:
       I'm a big fan of `s3` as the prefix, and then `s3.checksum.enabled` would be my preferred choice.
   
   This really explains what the intent is, vs etag-check more explains how it works and isn't as informative for users who don't know the underlying details of S3.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SinghAsDev commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
SinghAsDev commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r779812163



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -216,43 +247,43 @@ private void uploadParts() {
       return;
     }
 
-    stagingFiles.stream()
+    stagingFilesWithETags.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
         .forEach(f -> {
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFilesWithETags.indexOf(f) + 1)
+              .contentLength(f.file().length());
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f.file()));
+                checkEtag(f.eTag(), response.eTag());

Review comment:
       @jackye1995 thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r788357091



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +78,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private MessageDigest currentPartMessageDigest;

Review comment:
       Rather than computing all of the digests ourselves, it might be cleaner to wrap the streams in `java.security.DigestOutputStream` and delegate to that in order to calculate the etag.  I'm not sure if that approach can be used for the complete message (since the parts are uploaded independently), but I also question whether we really need to validate the full checksum if we get all of the parts verified.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r790319346



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,32 +241,43 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();
           UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
               .bucket(location.bucket())
               .key(location.key())
               .uploadId(multipartUploadId)
-              .partNumber(stagingFiles.indexOf(f) + 1)
-              .contentLength(f.length());
+              .partNumber(stagingFiles.indexOf(fileAndDigest) + 1)
+              .contentLength(file.length());
+
+          if (fileAndDigest.hasDigest()) {
+            requestBuilder.contentMD5(BinaryUtils.toBase64(fileAndDigest.getDigest()));
+          }
 
           S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
 
           UploadPartRequest uploadRequest = requestBuilder.build();
 
           CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
               () -> {
-                UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+                UploadPartResponse response = null;
+                try {
+                  response = s3.uploadPart(uploadRequest, RequestBody.fromFile(file));
+                } catch (UncheckedIOException uncheckedIOException) {
+                  throw new UncheckedIOException(

Review comment:
       Thanks for the info. I wouldn't worry about it. There is no need for error handling here so let's just let the exception propagate and we can worry about that concern in more appropriate places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r790324216



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -172,15 +187,23 @@ public void write(byte[] b, int off, int len) throws IOException {
 
   private void newStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      closeStream();
     }
 
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
 
-    stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
+    stagingFiles.add(new FileAndDigest(currentStagingFile));
+
+    if (isEtagCheckEnabled) {
+      currentPartMessageDigest.reset();

Review comment:
       I don't think we need to worry too much about the extra bytes necessary to store the digest.  Readability/maintainability is much more important than that little bit of efficiency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789819029



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -218,41 +241,50 @@ private void uploadParts() {
 
     stagingFiles.stream()
         // do not upload the file currently being written
-        .filter(f -> closed || !f.equals(currentStagingFile))
+        .filter(f -> closed || !f.file().equals(currentStagingFile))
         // do not upload any files that have already been processed
-        .filter(Predicates.not(multiPartMap::containsKey))
-        .forEach(f -> {
+        .filter(Predicates.not(f -> multiPartMap.containsKey(f.file())))
+        .forEach(fileAndDigest -> {
+          File file = fileAndDigest.file();

Review comment:
       I'd rather suppress the checkstyle rule and have fewer changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r789136148



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -74,13 +79,16 @@
   private final AwsProperties awsProperties;
 
   private CountingOutputStream stream;
-  private final List<File> stagingFiles = Lists.newArrayList();
+  private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
   private final File stagingDirectory;
   private File currentStagingFile;
   private String multipartUploadId;
   private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
   private final int multiPartSize;
   private final int multiPartThresholdSize;
+  private final boolean isEtagCheckEnabled;
+  private final MessageDigest completeMessageDigest;
+  private final MessageDigest currentPartMessageDigest;

Review comment:
       It looks like the purpose of having both digests is to handle the cases where either multi-part or PUT is used. Right?
   
   Do we need both, or can we reuse the first part message digest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3813: [S3FileIO] Add capability to perform checksum validations using S3 eTags

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3813:
URL: https://github.com/apache/iceberg/pull/3813#discussion_r791973390



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -178,9 +192,27 @@ private void newStream() throws IOException {
     createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
-    stagingFiles.add(currentStagingFile);
+    try {
+      currentPartMessageDigest = isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException("Failed to create message digest needed for s3 checksum checks.", e);
+    }
+
+    stagingFiles.add(new FileAndDigest(currentStagingFile, currentPartMessageDigest));
+
+    if (isChecksumEnabled) {
+      DigestOutputStream digestOutputStream = new DigestOutputStream(new DigestOutputStream(new BufferedOutputStream(
+          new FileOutputStream(currentStagingFile)), currentPartMessageDigest), completeMessageDigest);
 
-    stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
+      // if switched over to multipart threshold already, no need to update complete message digest
+      if (multipartUploadId != null) {

Review comment:
       Isn't this known before we create the double `DigestOutputStream`? I think you could just check this above and create a single-wrapped stream instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org