You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/08/20 21:48:22 UTC

[iceberg] branch master updated: Spark: Add a method to load an Iceberg table by its name in Spark3. (#2983)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c7ba48  Spark: Add a method to load an Iceberg table by its name in Spark3. (#2983)
9c7ba48 is described below

commit 9c7ba48fe885d74ba4026fbaa6d8f510531ad359
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Fri Aug 20 14:48:13 2021 -0700

    Spark: Add a method to load an Iceberg table by its name in Spark3. (#2983)
    
    * Spark: Add a method to load Iceberg table by its name in Spark3.
---
 .../java/org/apache/iceberg/spark/Spark3Util.java  | 22 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/TestSpark3Util.java   | 16 +++++++++++++++-
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 7b52ba1..e8bf623 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.UpdateSchema;
@@ -755,6 +756,27 @@ public class Spark3Util {
     return null;
   }
 
+  /**
+   * Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
+   * the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
+   *
+   * @param spark SparkSession used for looking up catalog references and tables
+   * @param name  The multipart identifier of the Iceberg table
+   * @return an Iceberg table
+   */
+  public static org.apache.iceberg.Table loadIcebergTable(SparkSession spark, String name)
+      throws ParseException, NoSuchTableException {
+    CatalogAndIdentifier catalogAndIdentifier = catalogAndIdentifier(spark, name);
+
+    CatalogPlugin catalog = catalogAndIdentifier.catalog;
+    Preconditions.checkArgument(catalog instanceof BaseCatalog, "Catalog %s(%s) is not an Iceberg Catalog",
+        catalog.name(), catalog.getClass().toString());
+    BaseCatalog baseCatalog = (BaseCatalog) catalogAndIdentifier.catalog;
+
+    Table sparkTable = baseCatalog.loadTable(catalogAndIdentifier.identifier);
+    return toIcebergTable(sparkTable);
+  }
+
   public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, String name) throws ParseException {
     return catalogAndIdentifier(spark, name, spark.sessionState().catalogManager().currentCatalog());
   }
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
index a14378b..8d57fe1 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,7 +32,7 @@ import static org.apache.iceberg.NullOrder.NULLS_LAST;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
-public class TestSpark3Util {
+public class TestSpark3Util extends SparkTestBase {
   @Test
   public void testDescribeSortOrder() {
     Schema schema = new Schema(
@@ -78,6 +79,19 @@ public class TestSpark3Util {
         Spark3Util.describe(schema));
   }
 
+  @Test
+  public void testLoadIcebergTable() throws Exception {
+    spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
+    spark.conf().set("spark.sql.catalog.hive.type", "hive");
+    spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
+
+    String tableFullName = "hive.default.tbl";
+    sql("CREATE TABLE %s (c1 bigint, c2 string, c3 string) USING iceberg", tableFullName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableFullName);
+    Assert.assertTrue(table.name().equals(tableFullName));
+  }
+
   private SortOrder buildSortOrder(String transform, Schema schema, int sourceId) {
     String jsonString = "{\n" +
             "  \"order-id\" : 10,\n" +