You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/29 17:46:29 UTC

[GitHub] [iceberg] abmo-x opened a new pull request, #5311: S3OutputStream - failure to close should persist on subsequent close calls

abmo-x opened a new pull request, #5311:
URL: https://github.com/apache/iceberg/pull/5311

   Fix for https://github.com/apache/iceberg/issues/5310 and https://github.com/apache/iceberg/issues/4168
   
   **Issue**
   
   When S3OutputStream fails to upload a file successfully on call to **close** due to some failure, IcebergStreamWriter in Flink still ends up adding the file to completedDataFiles from **BaseTaskWriter** resulting in table metadata pointing to a s3 data file which was never uploaded to s3. 
   
   **Steps to Reproduce**
   
   -  Flink 1.14 pipeline with Iceberg 0.13
   -  Customer implemented **ProcessFunction<FlinkRecord, Row>** function with catch all exceptions in **processElement**
         - This is important as this is what leads to close() called twice from:
            - [shouldRollToNewFile](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L257) --> [closeCurrent](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L288)
            - [close](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L309)
   
   - configure pipeline to use S3FileIO and file size according to your test data so that the file will roll to new file 
   - S3 failure on putObject(should be reproducible for MultiPartUpload as well) call to  [shouldRollToNewFile](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L257) which calls [close --> completeUploads](https://github.com/apache/iceberg/blob/c8f93dfad1925c7192d2d01bbfd8b790b364b54e/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java#L242) 
       
   <details>
     <summary> StackTrace from failure</summary>
   <p>
   
   ```
   org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
   	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
   	...
   	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
   	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
   	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
   	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
   	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
   	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
   	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
   	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
   	... 21 more
   Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from service endpoint.
   	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
   	at software.amazon.awssdk.auth.credentials.HttpCredentialsProvider.refreshCredentials(HttpCredentialsProvider.java:110)
   	at software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:132)
   	at software.amazon.awssdk.utils.cache.OneCallerBlocks.prefetch(OneCallerBlocks.java:38)
   	at software.amazon.awssdk.utils.cache.CachedSupplier.prefetchCache(CachedSupplier.java:116)
   	at software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:91)
   	at java.base/java.util.Optional.map(Optional.java:265)
   	at software.amazon.awssdk.auth.credentials.HttpCredentialsProvider.resolveCredentials(HttpCredentialsProvider.java:146)
   	at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:85)
   	at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
   	at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:104)
   	at software.amazon.awssdk.awscore.client.handler.AwsClientHandlerUtils.createExecutionContext(AwsClientHandlerUtils.java:76)
   	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.createExecutionContext(AwsSyncClientHandler.java:68)
   	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:97)
   	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:167)
   	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:94)
   	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
   	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
   	at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:8350)
   	at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:396)
   	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256)
   	at org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
   	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106)
   	at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:239)
   	at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
   	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:288)
   	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:254)
   	at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:58)
   	at org.apache.iceberg.flink.sink.IcebergStreamWriter.processElement(IcebergStreamWriter.java:74)
   	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
   	... 27 more
   Caused by: software.amazon.awssdk.core.exception.SdkServiceException: Unauthorized
   ```
   </p>
   </details>
   
   - Pipeline should keep running even on above failure, then [snapshot barrier](https://github.com/apache/iceberg/blob/3584c79022ec70f79b326550736b4600d249e4a2/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java#L65) gets triggered 
     - This calls close and ends up adding the datafile which was never uploaded to S3
   
   **Testing**
   - Unit tests added
   - Testing on our dev pipeline, will update the results after the pipeline runs for a little bit
      


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r935130523


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -257,6 +258,13 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.

Review Comment:
   updated based on suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r925962896


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -240,6 +242,23 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    /**
+     * As calls to close() are used to mark a stream as completed and add the file
+     * to the completed data files in BaseTaskWriter we cannot just return if this
+     * was already closed with failure to upload.
+     *
+     * As close 'completes' the upload and staging files are cleanup, any failure to
+     * upload should be persisted and thrown back to the caller to make sure
+     * the underlying file represented by this stream is not used in the completedDataFiles
+     * of a manifest.
+     * see: https://github.com/apache/iceberg/issues/5310
+      */
+    if(closeFailureState != null){
+      throw new IllegalStateException("close called on a closed stream which failed to close completely earlier at: "+closeFailureState.getFailureAt()

Review Comment:
   Few style errors on this line, Checkstyle will mark them as errors though. (Missing space between + and string, "comma after a newline"). Instructions on running our current checkstyle rules are in the dev docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
abmo-x commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1190837724

   @rdblue @RussellSpitzer 
   Added a commit to clear currentWriter on close in BaseTaskWriter and added 2 test cases around failure to close and complete.
   
   I agree close should be only called once and we are relying on that behavior quite strongly and adding the data files. 
   However I have found the writers are held and closed more than once in various scenarios which causes this issue where a close resulted in failure and writers were in a bad state.
   
   1. when user defined functions catch all exceptions and ignore failures on write as seen in Flink's processElement which internally triggers a roll to new file.
   2. This behavior was also observed before and fix was made in https://github.com/apache/iceberg/pull/1749 
   
   Let me know your thoughts. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] krisdas commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
krisdas commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r926077447


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,97 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.*;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+    DataWriter<String> dataWriter;
+    @Mock
+    FileAppender<String> appender;
+    @Mock
+    StructLike partition;
+    @Mock
+    EncryptionKeyMetadata keyMetadata;
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+    private File file;
+
+    @Before
+    public void setUp() throws IOException {
+        file = temp.newFile();
+        file.deleteOnExit();
+        dataWriter = new DataWriter<>(appender, FileFormat.PARQUET, file.getPath(), PartitionSpec.unpartitioned(), partition, keyMetadata);
+    }
+
+    @Test
+    public void write() {
+        String row = "first_row";
+        dataWriter.write(row);
+        Mockito.verify(appender, Mockito.times(1)).add(row);
+    }
+
+    @Test
+    public void close() throws IOException {
+        Mockito.when(appender.length()).thenReturn(1L);
+        Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+        Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+        dataWriter.close();
+        DataFile dataFile = dataWriter.toDataFile();
+        Assert.assertEquals(file.getPath(), dataFile.path());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void closeFailureShouldNotReturnDataFile() throws IOException {
+        Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+                .when(appender).close();
+        try {
+            dataWriter.close();
+        } catch (Exception e) {
+            // ignore mocked failure on first call to close
+        }
+        dataWriter.close();
+        dataWriter.toDataFile();

Review Comment:
   L89 : will this line execute? line 88 will always throw exception, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r930379383


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+  DataWriter<String> dataWriter;
+  @Mock
+  FileAppender<String> appender;
+  @Mock
+  StructLike partition;
+  @Mock
+  EncryptionKeyMetadata keyMetadata;
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private File file;
+
+  @Before
+  public void before() throws IOException {
+    file = temp.newFile();
+    file.deleteOnExit();
+    dataWriter = new DataWriter<>(appender,
+        FileFormat.PARQUET,
+        file.getPath(),
+        PartitionSpec.unpartitioned(),
+        partition,
+        keyMetadata);
+  }
+
+  @Test
+  public void write() {
+    String row = "first_row";
+    dataWriter.write(row);
+    Mockito.verify(appender, Mockito.times(1)).add(row);
+  }
+
+  @Test
+  public void close() throws IOException {
+    Mockito.when(appender.length()).thenReturn(1L);
+    Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+    Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+    dataWriter.close();
+    DataFile dataFile = dataWriter.toDataFile();
+    Assert.assertEquals(file.getPath(), dataFile.path());
+  }
+
+  @Test
+  public void closeFailureShouldNotReturnDataFile() throws IOException {
+    Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+        .when(appender).close();
+    Assertions.assertThatThrownBy(() -> {

Review Comment:
   nit: it is unclear whether exception is expected from `dataWriter.close()` or `dataWriter.toDataFile()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r925896897


##########
aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java:
##########
@@ -27,21 +27,16 @@
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;

Review Comment:
   no wildcard imports



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer merged pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
RussellSpitzer merged PR #5311:
URL: https://github.com/apache/iceberg/pull/5311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r926109693


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,97 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.*;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+    DataWriter<String> dataWriter;
+    @Mock
+    FileAppender<String> appender;
+    @Mock
+    StructLike partition;
+    @Mock
+    EncryptionKeyMetadata keyMetadata;
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+    private File file;
+
+    @Before
+    public void setUp() throws IOException {
+        file = temp.newFile();
+        file.deleteOnExit();
+        dataWriter = new DataWriter<>(appender, FileFormat.PARQUET, file.getPath(), PartitionSpec.unpartitioned(), partition, keyMetadata);
+    }
+
+    @Test
+    public void write() {
+        String row = "first_row";
+        dataWriter.write(row);
+        Mockito.verify(appender, Mockito.times(1)).add(row);
+    }
+
+    @Test
+    public void close() throws IOException {
+        Mockito.when(appender.length()).thenReturn(1L);
+        Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+        Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+        dataWriter.close();
+        DataFile dataFile = dataWriter.toDataFile();
+        Assert.assertEquals(file.getPath(), dataFile.path());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void closeFailureShouldNotReturnDataFile() throws IOException {
+        Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+                .when(appender).close();
+        try {
+            dataWriter.close();
+        } catch (Exception e) {
+            // ignore mocked failure on first call to close
+        }
+        dataWriter.close();
+        dataWriter.toDataFile();

Review Comment:
   yes, L89 will execute and we are verifying that here with expectation that call on L89 will throw an exception as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x closed pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x closed pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls
URL: https://github.com/apache/iceberg/pull/5311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r925924259


##########
aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java:
##########
@@ -27,21 +27,16 @@
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;

Review Comment:
   cleaned them up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r925896218


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,97 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.*;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+    DataWriter<String> dataWriter;
+    @Mock
+    FileAppender<String> appender;
+    @Mock
+    StructLike partition;
+    @Mock
+    EncryptionKeyMetadata keyMetadata;
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+    private File file;
+
+    @Before
+    public void setUp() throws IOException {
+        file = temp.newFile();
+        file.deleteOnExit();
+        dataWriter = new DataWriter<>(appender, FileFormat.PARQUET, file.getPath(), PartitionSpec.unpartitioned(), partition, keyMetadata);
+    }
+
+    @Test
+    public void write() {
+        String row = "first_row";
+        dataWriter.write(row);
+        Mockito.verify(appender, Mockito.times(1)).add(row);
+    }
+
+    @Test
+    public void close() throws IOException {
+        Mockito.when(appender.length()).thenReturn(1L);
+        Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+        Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+        dataWriter.close();
+        DataFile dataFile = dataWriter.toDataFile();
+        Assert.assertEquals(file.getPath(), dataFile.path());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void closeFailureShouldNotReturnDataFile() throws IOException {

Review Comment:
   👍 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r935109853


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -240,6 +241,13 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.
+    if (closeFailureException != null) {
+      throw new IOException(
+          "Attempted to close a S3 stream that failed to close earlier", closeFailureException);

Review Comment:
   This makes sense to me.
   
   Nit: the error msg doesn't read smoothly. what about `Attempted to close a S3 output stream that has earlier close failed already`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r930380976


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+  DataWriter<String> dataWriter;
+  @Mock
+  FileAppender<String> appender;
+  @Mock
+  StructLike partition;
+  @Mock
+  EncryptionKeyMetadata keyMetadata;
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private File file;
+
+  @Before
+  public void before() throws IOException {
+    file = temp.newFile();
+    file.deleteOnExit();
+    dataWriter = new DataWriter<>(appender,
+        FileFormat.PARQUET,
+        file.getPath(),
+        PartitionSpec.unpartitioned(),
+        partition,
+        keyMetadata);
+  }
+
+  @Test
+  public void write() {
+    String row = "first_row";
+    dataWriter.write(row);
+    Mockito.verify(appender, Mockito.times(1)).add(row);
+  }
+
+  @Test
+  public void close() throws IOException {
+    Mockito.when(appender.length()).thenReturn(1L);
+    Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+    Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+    dataWriter.close();
+    DataFile dataFile = dataWriter.toDataFile();
+    Assert.assertEquals(file.getPath(), dataFile.path());
+  }
+
+  @Test
+  public void closeFailureShouldNotReturnDataFile() throws IOException {
+    Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+        .when(appender).close();
+    Assertions.assertThatThrownBy(() -> {
+      try {
+        dataWriter.close();
+      } catch (Exception e) {
+        // ignore mocked failure on first call to close
+      }
+      dataWriter.close();
+      dataWriter.toDataFile();
+    }).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void toDataFile() {

Review Comment:
   nit: maybe make the test method name more informative. is `toDataFileOnEmptyWriterShouldFail` accurate? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r925961847


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -240,6 +242,23 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    /**
+     * As calls to close() are used to mark a stream as completed and add the file
+     * to the completed data files in BaseTaskWriter we cannot just return if this

Review Comment:
   comma between "BaseTaskWriter" and "we cannot"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x closed pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x closed pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls
URL: https://github.com/apache/iceberg/pull/5311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r930427466


##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+  DataWriter<String> dataWriter;
+  @Mock
+  FileAppender<String> appender;
+  @Mock
+  StructLike partition;
+  @Mock
+  EncryptionKeyMetadata keyMetadata;
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private File file;
+
+  @Before
+  public void before() throws IOException {
+    file = temp.newFile();
+    file.deleteOnExit();
+    dataWriter = new DataWriter<>(appender,
+        FileFormat.PARQUET,
+        file.getPath(),
+        PartitionSpec.unpartitioned(),
+        partition,
+        keyMetadata);
+  }
+
+  @Test
+  public void write() {
+    String row = "first_row";
+    dataWriter.write(row);
+    Mockito.verify(appender, Mockito.times(1)).add(row);
+  }
+
+  @Test
+  public void close() throws IOException {
+    Mockito.when(appender.length()).thenReturn(1L);
+    Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+    Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+    dataWriter.close();
+    DataFile dataFile = dataWriter.toDataFile();
+    Assert.assertEquals(file.getPath(), dataFile.path());
+  }
+
+  @Test
+  public void closeFailureShouldNotReturnDataFile() throws IOException {
+    Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+        .when(appender).close();
+    Assertions.assertThatThrownBy(() -> {

Review Comment:
   reverted these changes. 



##########
core/src/test/java/org/apache/iceberg/io/TestDataWriter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestDataWriter {
+
+  DataWriter<String> dataWriter;
+  @Mock
+  FileAppender<String> appender;
+  @Mock
+  StructLike partition;
+  @Mock
+  EncryptionKeyMetadata keyMetadata;
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private File file;
+
+  @Before
+  public void before() throws IOException {
+    file = temp.newFile();
+    file.deleteOnExit();
+    dataWriter = new DataWriter<>(appender,
+        FileFormat.PARQUET,
+        file.getPath(),
+        PartitionSpec.unpartitioned(),
+        partition,
+        keyMetadata);
+  }
+
+  @Test
+  public void write() {
+    String row = "first_row";
+    dataWriter.write(row);
+    Mockito.verify(appender, Mockito.times(1)).add(row);
+  }
+
+  @Test
+  public void close() throws IOException {
+    Mockito.when(appender.length()).thenReturn(1L);
+    Mockito.when(appender.metrics()).thenReturn(Mockito.mock(Metrics.class));
+    Mockito.when(appender.splitOffsets()).thenReturn(Arrays.asList(1L));
+    dataWriter.close();
+    DataFile dataFile = dataWriter.toDataFile();
+    Assert.assertEquals(file.getPath(), dataFile.path());
+  }
+
+  @Test
+  public void closeFailureShouldNotReturnDataFile() throws IOException {
+    Mockito.doThrow(new IllegalStateException("mock close failure of appender"))
+        .when(appender).close();
+    Assertions.assertThatThrownBy(() -> {
+      try {
+        dataWriter.close();
+      } catch (Exception e) {
+        // ignore mocked failure on first call to close
+      }
+      dataWriter.close();
+      dataWriter.toDataFile();
+    }).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void toDataFile() {

Review Comment:
   reverted these changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1196001504

   After further discussion with @RussellSpitzer, brought back my changes to S3OutputStream.
   
   As failure to close a S3 stream leaves it in a bad state which cannot be recovered, any future calls to that stream should continue to fail. Changes now are simple and just in S3OutputStream.
   
   cc @rdblue 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1190740182

   I agree with @RussellSpitzer, I think we should avoid the double close, since that is what is causing the problem (at least as far as I understand).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r935128214


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -240,6 +241,13 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.
+    if (closeFailureException != null) {
+      throw new IOException(
+          "Attempted to close a S3 stream that failed to close earlier", closeFailureException);

Review Comment:
   Thanks for the suggestion @stevenzwu, made few changes based on the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
abmo-x commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1202030169

   > @abmo-x Based on the description, it seems to me that application shouldn't have caught and swallowed all exceptions in the process function. I know it tries to catch "bad" record and route to a dead-letter table. It probably should just catch and handle the serialization specific exceptions.
   
   Yes, the application code will be updated to handle exceptions correctly as well. As there are [other cases](https://github.com/apache/iceberg/pull/1749) where the S3 output stream can be still used after its closed I think this defensive code helps enforce bad data files are not added to metadata. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r935111103


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -257,6 +258,13 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.

Review Comment:
   nit: wondering if the following comment is a little more clear based on Russel's comment
   
   ```
   A previously failed close left the S3 output stream in a bad state that is non-recoverable.
   Any future close on this stream should fail.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r933455961


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java:
##########
@@ -240,6 +241,13 @@ private void newStream() throws IOException {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.
+    if (closeFailureException != null) {
+      throw new IOException(
+          "Attempted to close a S3 stream that failed to close earlier", closeFailureException);

Review Comment:
   @rdblue We had a discussion about this offline and I think this is probably the only safe thing to do. While double-closing shouldn't throw an exception, if the close failed that means that any future close is also a failure. The S3OutputStream loses it's internal state on close which means that a subsequent close call can never succeed if the IOException is handled.
   
   Because we can't ever retry successfully we decided that the stream should keep throwing exceptions to allow a caller to not falsely believe that a retry close had succeeded when the original close failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #5311: S3OutputStream - failure to close should persist on subsequent close calls

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1202016126

   @abmo-x Based on the description, it seems to me that application shouldn't have caught and swallowed all exceptions in the process function. I know it tries to catch "bad" record and route to a dead-letter table. It probably should just catch and handle the serialization specific exceptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] abmo-x commented on pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
abmo-x commented on PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#issuecomment-1194655407

   reverted changes to S3OutputStream to keep close api consistent


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #5311: DataWriter - failure to close should not add file to completedDataFiles

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #5311:
URL: https://github.com/apache/iceberg/pull/5311#discussion_r930351044


##########
core/src/main/java/org/apache/iceberg/io/DataWriter.java:
##########
@@ -78,7 +80,14 @@ public long length() {
 
   @Override
   public void close() throws IOException {
-    if (dataFile == null) {
+    /* As close is called couple times from BaseTaskWriter
+      on rollToNewFile and close, we don't want to create a data file
+      if appender.close was already called and failed due to an internal close failure.
+      Subsequent call to close after failure should not create a data file and keep it null.
+      We have observed appender.close failed when S3OutputStream.close fails and subsequent call
+      from flink tries to close the writer and use the data file which was never uploaded.
+    */
+    if (closed.compareAndSet(false, true)) {

Review Comment:
   Do we need a new variable here or can we make dataFile into an Atomic reference?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org