You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/12/04 16:43:45 UTC

[iceberg] branch master updated: Core: Throw CommitStateUnknownException in old RewriteDatafilesAction (#2932)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c6e68d8  Core: Throw CommitStateUnknownException in old RewriteDatafilesAction (#2932)
c6e68d8 is described below

commit c6e68d832113266994c0eb06d342e032fc9cbca3
Author: wgcn <10...@qq.com>
AuthorDate: Sun Dec 5 00:43:33 2021 +0800

    Core: Throw CommitStateUnknownException in old RewriteDatafilesAction (#2932)
    
    During the commit phase of the old RewriteDatafiles action there is a possibility that the commit fails and the state is unknown. In this case cleaning up newly created files is dangerous and must be avoided. This issue is fixed in the new API but that API is not yet available on all frameworks so the fix is backported here.
---
 .../iceberg/actions/BaseRewriteDataFilesAction.java  | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 5c0cf7a..1d9b25b 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -32,11 +32,13 @@ import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
@@ -271,11 +273,12 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
   private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles,
                                 long startingSnapshotId) {
     try {
-      RewriteFiles rewriteFiles = table.newRewrite()
-          .validateFromSnapshot(startingSnapshotId)
-          .rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
-      commit(rewriteFiles);
+      doReplace(deletedDataFiles, addedDataFiles, startingSnapshotId);
+    } catch (CommitStateUnknownException e) {
+      LOG.warn("Commit state unknown, cannot clean up files that may have been committed", e);
+      throw e;
     } catch (Exception e) {
+      LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
       Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
           .noRetry()
           .suppressFailureWhenFinished()
@@ -285,6 +288,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
     }
   }
 
+  @VisibleForTesting
+  void doReplace(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles,
+      long startingSnapshotId) {
+    RewriteFiles rewriteFiles = table.newRewrite()
+        .validateFromSnapshot(startingSnapshotId)
+        .rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+    commit(rewriteFiles);
+  }
+
   private boolean isPartialFileScan(CombinedScanTask task) {
     if (task.files().size() == 1) {
       FileScanTask fileScanTask = task.files().iterator().next();