You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/02/10 22:35:51 UTC

[iceberg] branch 0.13.x updated (7223742 -> 0d9c63e)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a change to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git.


    from 7223742  Add version.txt for release 0.13.0
     new 5d599e1  Spark 3.2: Fix predicate pushdown in row-level operations (#4023)
     new 614ec11  Spark: Fix create table in Hadoop catalog root namespace (#4024)
     new 0d9c63e  Flink: Ensure temp manifest names are unique across tasks (#3986)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  7 ++--
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  4 +-
 .../flink/sink/ManifestOutputFileFactory.java      |  8 ++--
 .../iceberg/flink/sink/TestFlinkManifest.java      | 18 ++++++--
 .../flink/sink/TestIcebergFilesCommitter.java      |  8 +++-
 .../org/apache/iceberg/spark/SparkCatalog.java     |  6 +++
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 11 +++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |  6 +++
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 11 +++++
 .../v2/RowLevelCommandScanRelationPushDown.scala   | 32 ++++++++++----
 .../apache/iceberg/spark/extensions/TestMerge.java | 49 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |  6 +++
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 11 +++++
 13 files changed, 155 insertions(+), 22 deletions(-)

[iceberg] 03/03: Flink: Ensure temp manifest names are unique across tasks (#3986)

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 0d9c63e5101f11463b6a5d6b2ec94dc9cec8dca8
Author: Peidian li <38...@users.noreply.github.com>
AuthorDate: Sat Feb 5 01:38:39 2022 +0800

    Flink: Ensure temp manifest names are unique across tasks (#3986)
---
 .../apache/iceberg/flink/sink/FlinkManifestUtil.java   |  7 ++++---
 .../iceberg/flink/sink/IcebergFilesCommitter.java      |  4 +++-
 .../iceberg/flink/sink/ManifestOutputFileFactory.java  |  8 +++++---
 .../apache/iceberg/flink/sink/TestFlinkManifest.java   | 18 ++++++++++++++----
 .../iceberg/flink/sink/TestIcebergFilesCommitter.java  |  8 ++++++--
 5 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index b00018b..d208593 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -61,10 +61,11 @@ class FlinkManifestUtil {
     }
   }
 
-  static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
-                                                           long attemptNumber) {
+  static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
+                                                           int subTaskId, long attemptNumber) {
     TableOperations ops = ((HasTableOperations) table).operations();
-    return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+    return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
+        subTaskId, attemptNumber);
   }
 
   static DeltaManifests writeCompletedFiles(WriteResult result,
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8f8bdad..beffff6 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -129,7 +129,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
 
     int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
     int attemptId = getRuntimeContext().getAttemptNumber();
-    this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+    String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
+    this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
+        subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
index fca8608..b7d575b 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
   private final FileIO io;
   private final Map<String, String> props;
   private final String flinkJobId;
+  private final String operatorUniqueId;
   private final int subTaskId;
   private final long attemptNumber;
   private final AtomicInteger fileCount = new AtomicInteger(0);
 
   ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
-                            String flinkJobId, int subTaskId, long attemptNumber) {
+                            String flinkJobId,  String operatorUniqueId, int subTaskId, long attemptNumber) {
     this.ops = ops;
     this.io = io;
     this.props = props;
     this.flinkJobId = flinkJobId;
+    this.operatorUniqueId = operatorUniqueId;
     this.subTaskId = subTaskId;
     this.attemptNumber = attemptNumber;
   }
 
   private String generatePath(long checkpointId) {
-    return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
-        attemptNumber, checkpointId, fileCount.incrementAndGet()));
+    return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
+        subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
   }
 
   OutputFile create(long checkpointId) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c1538bc..4a47656 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -87,9 +87,10 @@ public class TestFlinkManifest {
   @Test
   public void testIO() throws IOException {
     String flinkJobId = newFlinkJobId();
+    String operatorId = newOperatorUniqueId();
     for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
       ManifestOutputFileFactory factory =
-          FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+          FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
       final long curCkpId = checkpointId;
 
       List<DataFile> dataFiles = generateDataFiles(10);
@@ -122,11 +123,12 @@ public class TestFlinkManifest {
   public void testUserProvidedManifestLocation() throws IOException {
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
+    String operatorId = newOperatorUniqueId();
     File userProvidedFolder = tempFolder.newFolder();
     Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
     ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
         ((HasTableOperations) table).operations(), table.io(), props,
-        flinkJobId, 1, 1);
+        flinkJobId, operatorId, 1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(5);
     DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
@@ -156,7 +158,9 @@ public class TestFlinkManifest {
   public void testVersionedSerializer() throws IOException {
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
-    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+    String operatorId = newOperatorUniqueId();
+    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+        1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(10);
     List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
@@ -186,7 +190,9 @@ public class TestFlinkManifest {
     // The v2 deserializer should be able to deserialize the v1 binary.
     long checkpointId = 1;
     String flinkJobId = newFlinkJobId();
-    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
+    String operatorId = newOperatorUniqueId();
+    ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
+        1, 1);
 
     List<DataFile> dataFiles = generateDataFiles(10);
     ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
@@ -271,4 +277,8 @@ public class TestFlinkManifest {
   private static String newFlinkJobId() {
     return UUID.randomUUID().toString();
   }
+
+  private static String newOperatorUniqueId() {
+    return UUID.randomUUID().toString();
+  }
 }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 135fa84..9c23d8a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -599,8 +599,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
+      String operatorId = harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals("File name should have the expected pattern.",
-          String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+          String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+          manifestPath.getFileName().toString());
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
@@ -640,8 +642,10 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.snapshot(checkpoint, ++timestamp);
       List<Path> manifestPaths = assertFlinkManifests(1);
       Path manifestPath = manifestPaths.get(0);
+      String operatorId = harness.getOneInputOperator().getOperatorID().toString();
       Assert.assertEquals("File name should have the expected pattern.",
-          String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
+          String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
+          manifestPath.getFileName().toString());
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());

[iceberg] 01/03: Spark 3.2: Fix predicate pushdown in row-level operations (#4023)

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 5d599e160787c39b2ded153514ef5dc5a74d890e
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Feb 1 12:46:48 2022 -0800

    Spark 3.2: Fix predicate pushdown in row-level operations (#4023)
---
 .../v2/RowLevelCommandScanRelationPushDown.scala   | 32 ++++++++++----
 .../apache/iceberg/spark/extensions/TestMerge.java | 49 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala
index 52b27d5..4e89b9a 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala
@@ -20,13 +20,17 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
@@ -39,16 +43,12 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic
       val table = relation.table.asRowLevelOperationTable
       val scanBuilder = table.newScanBuilder(relation.options)
 
-      val filters = command.condition.toSeq
-      val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output)
-      val (_, normalizedFiltersWithoutSubquery) =
-        normalizedFilters.partition(SubqueryExpression.hasSubquery)
-
-      val (pushedFilters, remainingFilters) = PushDownUtils.pushFilters(
-        scanBuilder, normalizedFiltersWithoutSubquery)
+      val (pushedFilters, remainingFilters) = command.condition match {
+        case Some(cond) => pushFilters(cond, scanBuilder, relation.output)
+        case None => (Nil, Nil)
+      }
 
-      val (scan, output) = PushDownUtils.pruneColumns(
-        scanBuilder, relation, relation.output, Seq.empty)
+      val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil)
 
       logInfo(
         s"""
@@ -68,6 +68,20 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic
       command.withNewRewritePlan(newRewritePlan)
   }
 
+  private def pushFilters(
+      cond: Expression,
+      scanBuilder: ScanBuilder,
+      tableAttrs: Seq[AttributeReference]): (Seq[Filter], Seq[Expression]) = {
+
+    val tableAttrSet = AttributeSet(tableAttrs)
+    val filters = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet))
+    val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, tableAttrs)
+    val (_, normalizedFiltersWithoutSubquery) =
+      normalizedFilters.partition(SubqueryExpression.hasSubquery)
+
+    PushDownUtils.pushFilters(scanBuilder, normalizedFiltersWithoutSubquery)
+  }
+
   private def toOutputAttrs(
       schema: StructType,
       relation: DataSourceV2Relation): Seq[AttributeReference] = {
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 23ba7ad..6537e31 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -32,6 +32,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -76,6 +79,52 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
   }
 
   @Test
+  public void testMergeWithStaticPredicatePushDown() {
+    createAndInitTable("id BIGINT, dep STRING");
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    append(tableName,
+        "{ \"id\": 1, \"dep\": \"software\" }\n" +
+        "{ \"id\": 11, \"dep\": \"software\" }\n" +
+        "{ \"id\": 1, \"dep\": \"hr\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snapshot = table.currentSnapshot();
+    String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
+    Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
+
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"dep\": \"finance\" }\n" +
+        "{ \"id\": 2, \"dep\": \"hardware\" }");
+
+    // disable dynamic pruning and rely only on static predicate pushdown
+    withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
+      sql("MERGE INTO %s t USING source " +
+          "ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " +
+          "WHEN MATCHED AND source.id = 1 THEN " +
+          "  UPDATE SET dep = source.dep " +
+          "WHEN NOT MATCHED THEN " +
+          "  INSERT (dep, id) VALUES (source.dep, source.id)", tableName);
+    });
+
+    table.refresh();
+
+    Snapshot mergeSnapshot = table.currentSnapshot();
+    String deletedDataFilesCount = mergeSnapshot.summary().get(SnapshotSummary.DELETED_FILES_PROP);
+    Assert.assertEquals("Must overwrite only 1 file", "1", deletedDataFilesCount);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row(1L, "finance"),  // updated
+        row(1L, "hr"),       // kept
+        row(2L, "hardware"), // new
+        row(11L, "software") // kept
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName));
+  }
+
+  @Test
   public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
     createAndInitTable("id INT, dep STRING");
 

[iceberg] 02/03: Spark: Fix create table in Hadoop catalog root namespace (#4024)

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 614ec112ffc8fbf419cd48194dc30e5342a7cb23
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Thu Feb 3 06:06:16 2022 +0800

    Spark: Fix create table in Hadoop catalog root namespace (#4024)
---
 .../src/main/java/org/apache/iceberg/spark/SparkCatalog.java  |  6 ++++++
 .../java/org/apache/iceberg/spark/sql/TestCreateTable.java    | 11 +++++++++++
 .../src/main/java/org/apache/iceberg/spark/SparkCatalog.java  |  6 ++++++
 .../java/org/apache/iceberg/spark/sql/TestCreateTable.java    | 11 +++++++++++
 .../src/main/java/org/apache/iceberg/spark/SparkCatalog.java  |  6 ++++++
 .../java/org/apache/iceberg/spark/sql/TestCreateTable.java    | 11 +++++++++++
 6 files changed, 51 insertions(+)

diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
       return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
 
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      if (ident.namespace().length == 0) {
+        throw e;
+      }
+
       // if the original load didn't work, the identifier may be extended and include a snapshot selector
       TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
       Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   private Identifier namespaceToIdentifier(String[] namespace) {
+    Preconditions.checkArgument(namespace.length > 0,
+        "Cannot convert empty namespace to identifier");
     String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
     String name = namespace[ns.length];
     return Identifier.of(ns, name);
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
   }
 
   @Test
+  public void testCreateTableInRootNamespace() {
+    Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+    try {
+      sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.table", catalogName);
+    }
+  }
+
+  @Test
   public void testCreateTableUsingParquet() {
     Assume.assumeTrue(
         "Not working with session catalog because Spark will not use v2 for a Parquet table",
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
       return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
 
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      if (ident.namespace().length == 0) {
+        throw e;
+      }
+
       // if the original load didn't work, the identifier may be extended and include a snapshot selector
       TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
       Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   private Identifier namespaceToIdentifier(String[] namespace) {
+    Preconditions.checkArgument(namespace.length > 0,
+        "Cannot convert empty namespace to identifier");
     String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
     String name = namespace[ns.length];
     return Identifier.of(ns, name);
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
   }
 
   @Test
+  public void testCreateTableInRootNamespace() {
+    Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+    try {
+      sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.table", catalogName);
+    }
+  }
+
+  @Test
   public void testCreateTableUsingParquet() {
     Assume.assumeTrue(
         "Not working with session catalog because Spark will not use v2 for a Parquet table",
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
       return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
 
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      if (ident.namespace().length == 0) {
+        throw e;
+      }
+
       // if the original load didn't work, the identifier may be extended and include a snapshot selector
       TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
       Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   private Identifier namespaceToIdentifier(String[] namespace) {
+    Preconditions.checkArgument(namespace.length > 0,
+        "Cannot convert empty namespace to identifier");
     String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
     String name = namespace[ns.length];
     return Identifier.of(ns, name);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
   }
 
   @Test
+  public void testCreateTableInRootNamespace() {
+    Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+    try {
+      sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.table", catalogName);
+    }
+  }
+
+  @Test
   public void testCreateTableUsingParquet() {
     Assume.assumeTrue(
         "Not working with session catalog because Spark will not use v2 for a Parquet table",