You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/08/20 21:48:22 UTC
[iceberg] branch master updated: Spark: Add a method to load an
Iceberg table by its name in Spark3. (#2983)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9c7ba48 Spark: Add a method to load an Iceberg table by its name in Spark3. (#2983)
9c7ba48 is described below
commit 9c7ba48fe885d74ba4026fbaa6d8f510531ad359
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Fri Aug 20 14:48:13 2021 -0700
Spark: Add a method to load an Iceberg table by its name in Spark3. (#2983)
* Spark: Add a method to load Iceberg table by its name in Spark3.
---
.../java/org/apache/iceberg/spark/Spark3Util.java | 22 ++++++++++++++++++++++
.../org/apache/iceberg/spark/TestSpark3Util.java | 16 +++++++++++++++-
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 7b52ba1..e8bf623 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
@@ -755,6 +756,27 @@ public class Spark3Util {
return null;
}
+ /**
+ * Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
+ * the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
+ *
+ * @param spark SparkSession used for looking up catalog references and tables
+ * @param name The multipart identifier of the Iceberg table
+ * @return an Iceberg table
+ */
+ public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, String name)
+ throws ParseException, NoSuchTableException {
+ CatalogAndIdentifier catalogAndIdentifier = catalogAndIdentifier(spark, name);
+
+ CatalogPlugin catalog = catalogAndIdentifier.catalog;
+ Preconditions.checkArgument(catalog instanceof BaseCatalog, "Catalog %s(%s) is not an Iceberg Catalog",
+ catalog.name(), catalog.getClass().toString());
+ BaseCatalog baseCatalog = (BaseCatalog) catalogAndIdentifier.catalog;
+
+ Table sparkTable = baseCatalog.loadTable(catalogAndIdentifier.identifier);
+ return toIcebergTable(sparkTable);
+ }
+
public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, String name) throws ParseException {
return catalogAndIdentifier(spark, name, spark.sessionState().catalogManager().currentCatalog());
}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
index a14378b..8d57fe1 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -31,7 +32,7 @@ import static org.apache.iceberg.NullOrder.NULLS_LAST;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-public class TestSpark3Util {
+public class TestSpark3Util extends SparkTestBase {
@Test
public void testDescribeSortOrder() {
Schema schema = new Schema(
@@ -78,6 +79,19 @@ public class TestSpark3Util {
Spark3Util.describe(schema));
}
+ @Test
+ public void testLoadIcebergTable() throws Exception {
+ spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.hive.type", "hive");
+ spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
+
+ String tableFullName = "hive.default.tbl";
+ sql("CREATE TABLE %s (c1 bigint, c2 string, c3 string) USING iceberg", tableFullName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableFullName);
+ Assert.assertTrue(table.name().equals(tableFullName));
+ }
+
private SortOrder buildSortOrder(String transform, Schema schema, int sourceId) {
String jsonString = "{\n" +
" \"order-id\" : 10,\n" +