You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/07 01:39:08 UTC

[iceberg] branch master updated: Spark: Fix Commit State Unknown Handling (#4687)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 61142bde8 Spark: Fix Commit State Unknown Handling (#4687)
61142bde8 is described below

commit 61142bde8859e3334231dcf2fa6f55e38d288baf
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Fri May 6 20:39:02 2022 -0500

    Spark: Fix Commit State Unknown Handling (#4687)
    
    Previously the snapshotUpdate operation could potentially throw a
    CommitStateUnknownException which would cause Spark's Datasource to call
    it's abort method. In this case we cannot remove the underlying data files
    since those data files may actually have been successfully committed. We
    previously added this code to prevent the removal of metadata files but
    this issue is also possible in data file cleanup.
---
 .../java/org/apache/iceberg/PendingUpdate.java     |  3 +
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++++-----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++++++
 4 files changed, 167 insertions(+), 16 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
index 8e47dd006..9c1b18434 100644
--- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 
 /**
@@ -49,6 +50,8 @@ public interface PendingUpdate<T> {
    *
    * @throws ValidationException If the update cannot be applied to the current table metadata.
    * @throws CommitFailedException If the update cannot be committed due to conflicts.
+   * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup should be done in
+   * this case.
    */
   void commit();
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 6a6ecd206..24ac36335 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -112,6 +113,8 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema,
@@ -195,25 +198,34 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..8bf57ba65 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,56 @@ public class TestSparkDataWrite {
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,