You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2022/06/23 20:43:33 UTC

[iceberg] branch master updated: Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef656ba0 Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)
6ef656ba0 is described below

commit 6ef656ba0c8f82d0e298947b5eb6bcc534f84ed3
Author: Bijan Houle <bi...@users.noreply.github.com>
AuthorDate: Thu Jun 23 14:43:27 2022 -0600

    Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)
    
    * Spark 3.2: Escape table identifier in RewriteDataFiles Procedure/Action
    
    Allows e.g. `db.special-chars`.`table.special-chars`
    
    * mark original method as deprecated
    
    * @deprecated tag in javadoc as well
    
    * keep existing interface
---
 .../apache/iceberg/actions/ActionsProvider.java    | 11 ++++++++
 .../action/IcebergSortCompactionBenchmark.java     | 29 +++++++++++-----------
 .../java/org/apache/iceberg/spark/Spark3Util.java  | 14 +++++++++++
 .../actions/BaseRewriteDataFilesSparkAction.java   | 11 +++++++-
 .../apache/iceberg/spark/actions/SparkActions.java |  6 +++++
 .../spark/actions/SparkBinPackStrategy.java        | 15 +++++++++--
 .../procedures/RewriteDataFilesProcedure.java      |  9 ++++---
 .../spark/actions/TestRewriteDataFilesAction.java  | 18 ++++++++------
 8 files changed, 86 insertions(+), 27 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index f2564ddb7..2a73809d6 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -56,11 +56,22 @@ public interface ActionsProvider {
 
   /**
    * Instantiates an action to rewrite data files.
+   * @deprecated please use {@link #rewriteDataFiles(Table, String)}
    */
+  @Deprecated
   default RewriteDataFiles rewriteDataFiles(Table table) {
     throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rewriteDataFiles");
   }
 
+  /**
+   * Instantiates an action to rewrite data files.
+   */
+  default RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " does not implement rewriteDataFiles(Table, String)"
+    );
+  }
+
   /**
    * Instantiates an action to expire snapshots.
    */
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 8c205037f..17f9e2386 100644
--- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,6 +74,7 @@ public class IcebergSortCompactionBenchmark {
   private static final String[] NAMESPACE = new String[] {"default"};
   private static final String NAME = "sortbench";
   private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
+  private static final String FULL_IDENT = Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
   private static final int NUM_FILES = 8;
   private static final long NUM_ROWS = 7500000L;
   private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -106,7 +107,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -119,7 +120,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -133,7 +134,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -149,7 +150,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -165,7 +166,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortString() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -178,7 +179,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -194,7 +195,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void sortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .sort(SortOrder
             .builderFor(table().schema())
@@ -212,7 +213,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol")
         .execute();
@@ -222,7 +223,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt2() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2")
         .execute();
@@ -232,7 +233,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt3() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3")
         .execute();
@@ -242,7 +243,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortInt4() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("intCol", "intCol2", "intCol3", "intCol4")
         .execute();
@@ -252,7 +253,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortString() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol")
         .execute();
@@ -262,7 +263,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortFourColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "doubleCol")
         .execute();
@@ -272,7 +273,7 @@ public class IcebergSortCompactionBenchmark {
   @Threads(1)
   public void zSortSixColumns() {
     SparkActions.get()
-        .rewriteDataFiles(table())
+        .rewriteDataFiles(table(), FULL_IDENT)
         .option(BinPackStrategy.REWRITE_ALL, "true")
         .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol")
         .execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 94146dbf2..b7d293675 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -67,6 +67,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.parser.ParserInterface;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -749,6 +750,19 @@ public class Spark3Util {
     return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
   }
 
+  public static String quotedFullIdentifier(String catalogName, Identifier identifier) {
+    List<String> parts =
+        ImmutableList.<String>builder()
+            .add(catalogName)
+            .addAll(Arrays.asList(identifier.namespace()))
+            .add(identifier.name())
+            .build();
+
+    return CatalogV2Implicits.MultipartIdentifierHelper(
+        JavaConverters.asScalaIteratorConverter(parts.iterator()).asScala().toSeq()
+    ).quoted();
+  }
+
   /**
    * Use Spark to list all partitions in the table.
    *
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 3a8d8a81f..7572cf668 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,6 +87,7 @@ public class BaseRewriteDataFilesSparkAction
   );
 
   private final Table table;
+  private final String fullIdentifier;
 
   private Expression filter = Expressions.alwaysTrue();
   private int maxConcurrentFileGroupRewrites;
@@ -96,9 +97,17 @@ public class BaseRewriteDataFilesSparkAction
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
+  @Deprecated
   protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
     super(spark);
     this.table = table;
+    this.fullIdentifier = null;
+  }
+
+  protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table, String fullIdentifier) {
+    super(spark);
+    this.table = table;
+    this.fullIdentifier = fullIdentifier;
   }
 
   @Override
@@ -428,7 +437,7 @@ public class BaseRewriteDataFilesSparkAction
   }
 
   private BinPackStrategy binPackStrategy() {
-    return new SparkBinPackStrategy(table, spark());
+    return new SparkBinPackStrategy(table, fullIdentifier, spark());
   }
 
   private SortStrategy sortStrategy() {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index d77a13d12..4f6bb3f00 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,10 +72,16 @@ public class SparkActions implements ActionsProvider {
   }
 
   @Override
+  @Deprecated
   public RewriteDataFiles rewriteDataFiles(Table table) {
     return new BaseRewriteDataFilesSparkAction(spark, table);
   }
 
+  @Override
+  public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) {
+    return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
+  }
+
   @Override
   public DeleteOrphanFiles deleteOrphanFiles(Table table) {
     return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index 79ff45a99..fcfaeaa75 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -38,13 +38,24 @@ import org.apache.spark.sql.internal.SQLConf;
 
 public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
+  private final String fullIdentifier;
   private final SparkSession spark;
   private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
 
+  public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+    // Fallback if a quoted identifier is not supplied
+    this.fullIdentifier = fullIdentifier == null ? table.name() : fullIdentifier;
+  }
+
+  @Deprecated
   public SparkBinPackStrategy(Table table, SparkSession spark) {
     this.table = table;
     this.spark = spark;
+    // Fallback if a quoted identifier is not supplied
+    this.fullIdentifier = table.name();
   }
 
   @Override
@@ -66,7 +77,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
           .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite)))
           .option(SparkReadOptions.FILE_OPEN_COST, "0")
-          .load(table.name());
+          .load(fullIdentifier);
 
       // All files within a file group are written with the same spec, so check the first
       boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
@@ -82,7 +93,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
           .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
           .mode("append")
-          .save(table.name());
+          .save(fullIdentifier);
 
       return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index b33eab6b5..1f6b34fd2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,7 +45,7 @@ import scala.runtime.BoxedUnit;
 /**
  * A procedure that rewrites datafiles in a table.
  *
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table, String)
  */
 class RewriteDataFilesProcedure extends BaseProcedure {
 
@@ -90,7 +91,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
 
     return modifyIcebergTable(tableIdent, table -> {
-      RewriteDataFiles action = actions().rewriteDataFiles(table);
+      String quotedFullIdentifier = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
+      RewriteDataFiles action = actions().rewriteDataFiles(table, quotedFullIdentifier);
 
       String strategy = args.isNullAt(1) ? null : args.getString(1);
       String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
@@ -107,7 +109,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
       }
 
       String where = args.isNullAt(4) ? null : args.getString(4);
-      action = checkAndApplyFilter(action, where, table.name());
+
+      action = checkAndApplyFilter(action, where, quotedFullIdentifier);
 
       RewriteDataFiles.Result result = action.execute();
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index be1285087..f7394f645 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -135,7 +135,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
   private RewriteDataFiles basicRewrite(Table table) {
     // Always compact regardless of input files
     table.refresh();
-    return actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+    return actions().rewriteDataFiles(table, tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
   }
 
   @Test
@@ -258,7 +258,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table)
+    Result result = actions().rewriteDataFiles(table, tableLocation)
         // do not include any file based on bin pack file size configs
         .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
         .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1))
@@ -292,7 +292,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
-    Result result = actions().rewriteDataFiles(table)
+    Result result = actions().rewriteDataFiles(table, tableLocation)
         .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
         .execute();
     Assert.assertEquals("Action should rewrite 1 data files", 1, result.rewrittenDataFilesCount());
@@ -1122,13 +1122,17 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     Table table = createTable(1);
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).binPack().sort());
+        "Cannot set strategy", () -> actions().rewriteDataFiles(table, tableLocation).binPack().sort());
 
     AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort().binPack());
+        "Cannot set strategy", () -> actions().rewriteDataFiles(table, tableLocation).sort().binPack());
 
-    AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
-        "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
+    AssertHelpers.assertThrows(
+        "Should be unable to set Strategy more than once",
+        IllegalArgumentException.class,
+        "Cannot set strategy",
+        () ->
+            actions().rewriteDataFiles(table, tableLocation).sort(SortOrder.unsorted()).binPack());
   }
 
   @Test