You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/02/10 22:35:53 UTC
[iceberg] 02/03: Spark: Fix create table in Hadoop catalog root namespace (#4024)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 614ec112ffc8fbf419cd48194dc30e5342a7cb23
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Thu Feb 3 06:06:16 2022 +0800
Spark: Fix create table in Hadoop catalog root namespace (#4024)
---
.../src/main/java/org/apache/iceberg/spark/SparkCatalog.java | 6 ++++++
.../java/org/apache/iceberg/spark/sql/TestCreateTable.java | 11 +++++++++++
.../src/main/java/org/apache/iceberg/spark/SparkCatalog.java | 6 ++++++
.../java/org/apache/iceberg/spark/sql/TestCreateTable.java | 11 +++++++++++
.../src/main/java/org/apache/iceberg/spark/SparkCatalog.java | 6 ++++++
.../java/org/apache/iceberg/spark/sql/TestCreateTable.java | 11 +++++++++++
6 files changed, 51 insertions(+)
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (ident.namespace().length == 0) {
+ throw e;
+ }
+
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
}
private Identifier namespaceToIdentifier(String[] namespace) {
+ Preconditions.checkArgument(namespace.length > 0,
+ "Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
}
@Test
+ public void testCreateTableInRootNamespace() {
+ Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+ try {
+ sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.table", catalogName);
+ }
+ }
+
+ @Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
"Not working with session catalog because Spark will not use v2 for a Parquet table",
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (ident.namespace().length == 0) {
+ throw e;
+ }
+
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
}
private Identifier namespaceToIdentifier(String[] namespace) {
+ Preconditions.checkArgument(namespace.length > 0,
+ "Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
}
@Test
+ public void testCreateTableInRootNamespace() {
+ Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+ try {
+ sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.table", catalogName);
+ }
+ }
+
+ @Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
"Not working with session catalog because Spark will not use v2 for a Parquet table",
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index ae921c5..d80b9bf 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -488,6 +488,10 @@ public class SparkCatalog extends BaseCatalog {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (ident.namespace().length == 0) {
+ throw e;
+ }
+
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
@@ -567,6 +571,8 @@ public class SparkCatalog extends BaseCatalog {
}
private Identifier namespaceToIdentifier(String[] namespace) {
+ Preconditions.checkArgument(namespace.length > 0,
+ "Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 303cbb5..0a4c936 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -68,6 +68,17 @@ public class TestCreateTable extends SparkCatalogTestBase {
}
@Test
+ public void testCreateTableInRootNamespace() {
+ Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));
+
+ try {
+ sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.table", catalogName);
+ }
+ }
+
+ @Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
"Not working with session catalog because Spark will not use v2 for a Parquet table",