You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2022/06/23 20:43:33 UTC
[iceberg] branch master updated: Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef656ba0 Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)
6ef656ba0 is described below
commit 6ef656ba0c8f82d0e298947b5eb6bcc534f84ed3
Author: Bijan Houle <bi...@users.noreply.github.com>
AuthorDate: Thu Jun 23 14:43:27 2022 -0600
Spark 3.2: RewriteDataFiles - Escape special characters in table identifiers (#5112)
* Spark 3.2: Escape table identifier in RewriteDataFiles Procedure/Action
Allows e.g. `db.special-chars`.`table.special-chars`
* mark original method as deprecated
* @deprecated tag in javadoc as well
* keep existing interface
---
.../apache/iceberg/actions/ActionsProvider.java | 11 ++++++++
.../action/IcebergSortCompactionBenchmark.java | 29 +++++++++++-----------
.../java/org/apache/iceberg/spark/Spark3Util.java | 14 +++++++++++
.../actions/BaseRewriteDataFilesSparkAction.java | 11 +++++++-
.../apache/iceberg/spark/actions/SparkActions.java | 6 +++++
.../spark/actions/SparkBinPackStrategy.java | 15 +++++++++--
.../procedures/RewriteDataFilesProcedure.java | 9 ++++---
.../spark/actions/TestRewriteDataFilesAction.java | 18 ++++++++------
8 files changed, 86 insertions(+), 27 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index f2564ddb7..2a73809d6 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -56,11 +56,22 @@ public interface ActionsProvider {
/**
* Instantiates an action to rewrite data files.
+ * @deprecated please use {@link #rewriteDataFiles(Table, String)}
*/
+ @Deprecated
default RewriteDataFiles rewriteDataFiles(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rewriteDataFiles");
}
+ /**
+ * Instantiates an action to rewrite data files.
+ */
+ default RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement rewriteDataFiles(Table, String)"
+ );
+ }
+
/**
* Instantiates an action to expire snapshots.
*/
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 8c205037f..17f9e2386 100644
--- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -74,6 +74,7 @@ public class IcebergSortCompactionBenchmark {
private static final String[] NAMESPACE = new String[] {"default"};
private static final String NAME = "sortbench";
private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME);
+ private static final String FULL_IDENT = Spark3Util.quotedFullIdentifier("spark_catalog", IDENT);
private static final int NUM_FILES = 8;
private static final long NUM_ROWS = 7500000L;
private static final long UNIQUE_VALUES = NUM_ROWS / 4;
@@ -106,7 +107,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -119,7 +120,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt2() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -133,7 +134,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt3() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -149,7 +150,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortInt4() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -165,7 +166,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortString() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -178,7 +179,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -194,7 +195,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void sortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.sort(SortOrder
.builderFor(table().schema())
@@ -212,7 +213,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol")
.execute();
@@ -222,7 +223,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt2() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2")
.execute();
@@ -232,7 +233,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt3() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3")
.execute();
@@ -242,7 +243,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortInt4() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("intCol", "intCol2", "intCol3", "intCol4")
.execute();
@@ -252,7 +253,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortString() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol")
.execute();
@@ -262,7 +263,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortFourColumns() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "doubleCol")
.execute();
@@ -272,7 +273,7 @@ public class IcebergSortCompactionBenchmark {
@Threads(1)
public void zSortSixColumns() {
SparkActions.get()
- .rewriteDataFiles(table())
+ .rewriteDataFiles(table(), FULL_IDENT)
.option(BinPackStrategy.REWRITE_ALL, "true")
.zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol")
.execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 94146dbf2..b7d293675 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -67,6 +67,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.parser.ParserInterface;
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -749,6 +750,19 @@ public class Spark3Util {
return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
}
+ public static String quotedFullIdentifier(String catalogName, Identifier identifier) {
+ List<String> parts =
+ ImmutableList.<String>builder()
+ .add(catalogName)
+ .addAll(Arrays.asList(identifier.namespace()))
+ .add(identifier.name())
+ .build();
+
+ return CatalogV2Implicits.MultipartIdentifierHelper(
+ JavaConverters.asScalaIteratorConverter(parts.iterator()).asScala().toSeq()
+ ).quoted();
+ }
+
/**
* Use Spark to list all partitions in the table.
*
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 3a8d8a81f..7572cf668 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -87,6 +87,7 @@ public class BaseRewriteDataFilesSparkAction
);
private final Table table;
+ private final String fullIdentifier;
private Expression filter = Expressions.alwaysTrue();
private int maxConcurrentFileGroupRewrites;
@@ -96,9 +97,17 @@ public class BaseRewriteDataFilesSparkAction
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
+ @Deprecated
protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
+ this.fullIdentifier = null;
+ }
+
+ protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table, String fullIdentifier) {
+ super(spark);
+ this.table = table;
+ this.fullIdentifier = fullIdentifier;
}
@Override
@@ -428,7 +437,7 @@ public class BaseRewriteDataFilesSparkAction
}
private BinPackStrategy binPackStrategy() {
- return new SparkBinPackStrategy(table, spark());
+ return new SparkBinPackStrategy(table, fullIdentifier, spark());
}
private SortStrategy sortStrategy() {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index d77a13d12..4f6bb3f00 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -72,10 +72,16 @@ public class SparkActions implements ActionsProvider {
}
@Override
+ @Deprecated
public RewriteDataFiles rewriteDataFiles(Table table) {
return new BaseRewriteDataFilesSparkAction(spark, table);
}
+ @Override
+ public RewriteDataFiles rewriteDataFiles(Table table, String fullIdentifier) {
+ return new BaseRewriteDataFilesSparkAction(spark, table, fullIdentifier);
+ }
+
@Override
public DeleteOrphanFiles deleteOrphanFiles(Table table) {
return new BaseDeleteOrphanFilesSparkAction(spark, table);
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index 79ff45a99..fcfaeaa75 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -38,13 +38,24 @@ import org.apache.spark.sql.internal.SQLConf;
public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
+ private final String fullIdentifier;
private final SparkSession spark;
private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+ public SparkBinPackStrategy(Table table, String fullIdentifier, SparkSession spark) {
+ this.table = table;
+ this.spark = spark;
+ // Fallback if a quoted identifier is not supplied
+ this.fullIdentifier = fullIdentifier == null ? table.name() : fullIdentifier;
+ }
+
+ @Deprecated
public SparkBinPackStrategy(Table table, SparkSession spark) {
this.table = table;
this.spark = spark;
+ // Fallback if a quoted identifier is not supplied
+ this.fullIdentifier = table.name();
}
@Override
@@ -66,7 +77,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite)))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
- .load(table.name());
+ .load(fullIdentifier);
// All files within a file group are written with the same spec, so check the first
boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
@@ -82,7 +93,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
.mode("append")
- .save(table.name());
+ .save(fullIdentifier);
return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index b33eab6b5..1f6b34fd2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,7 +45,7 @@ import scala.runtime.BoxedUnit;
/**
* A procedure that rewrites datafiles in a table.
*
- * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table, String)
*/
class RewriteDataFilesProcedure extends BaseProcedure {
@@ -90,7 +91,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
return modifyIcebergTable(tableIdent, table -> {
- RewriteDataFiles action = actions().rewriteDataFiles(table);
+ String quotedFullIdentifier = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
+ RewriteDataFiles action = actions().rewriteDataFiles(table, quotedFullIdentifier);
String strategy = args.isNullAt(1) ? null : args.getString(1);
String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
@@ -107,7 +109,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
}
String where = args.isNullAt(4) ? null : args.getString(4);
- action = checkAndApplyFilter(action, where, table.name());
+
+ action = checkAndApplyFilter(action, where, quotedFullIdentifier);
RewriteDataFiles.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index be1285087..f7394f645 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -135,7 +135,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
private RewriteDataFiles basicRewrite(Table table) {
// Always compact regardless of input files
table.refresh();
- return actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1");
+ return actions().rewriteDataFiles(table, tableLocation).option(BinPackStrategy.MIN_INPUT_FILES, "1");
}
@Test
@@ -258,7 +258,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table)
+ Result result = actions().rewriteDataFiles(table, tableLocation)
// do not include any file based on bin pack file size configs
.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1))
@@ -292,7 +292,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
- Result result = actions().rewriteDataFiles(table)
+ Result result = actions().rewriteDataFiles(table, tableLocation)
.option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
.execute();
Assert.assertEquals("Action should rewrite 1 data files", 1, result.rewrittenDataFilesCount());
@@ -1122,13 +1122,17 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
Table table = createTable(1);
AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table).binPack().sort());
+ "Cannot set strategy", () -> actions().rewriteDataFiles(table, tableLocation).binPack().sort());
AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort().binPack());
+ "Cannot set strategy", () -> actions().rewriteDataFiles(table, tableLocation).sort().binPack());
- AssertHelpers.assertThrows("Should be unable to set Strategy more than once", IllegalArgumentException.class,
- "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
+ AssertHelpers.assertThrows(
+ "Should be unable to set Strategy more than once",
+ IllegalArgumentException.class,
+ "Cannot set strategy",
+ () ->
+ actions().rewriteDataFiles(table, tableLocation).sort(SortOrder.unsorted()).binPack());
}
@Test