You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/23 17:07:43 UTC

[incubator-iceberg] branch spark-3 updated: Add session catalog for Spark 3.0 (#705)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/spark-3 by this push:
     new c50ee1e  Add session catalog for Spark 3.0 (#705)
c50ee1e is described below

commit c50ee1e6c84b4f444a300737ecae905a2bd7cf1c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Dec 23 09:07:33 2019 -0800

    Add session catalog for Spark 3.0 (#705)
---
 .../iceberg/exceptions/NotIcebergException.java    |  39 +++++
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |   6 +-
 .../apache/iceberg/hive/HiveTableOperations.java   |  23 +--
 .../apache/iceberg/spark/SparkSessionCatalog.java  | 190 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |   4 +
 5 files changed, 250 insertions(+), 12 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java b/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java
new file mode 100644
index 0000000..6066d5d
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+/**
+ * Exception raised when a table is found but is not an Iceberg table.
+ */
+public class NotIcebergException extends NoSuchTableException {
+  public NotIcebergException(String message, Object... args) {
+    super(message, args);
+  }
+
+  public NotIcebergException(Throwable cause, String message, Object... args) {
+    super(cause, message, args);
+  }
+
+  public static void check(boolean test, String message, Object... args) {
+    if (!test) {
+      throw new NotIcebergException(message, args);
+    }
+  }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 9213d7f..fb316f7 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -99,7 +99,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
   @Override
   public boolean dropTable(TableIdentifier identifier, boolean purge) {
     if (!isValidIdentifier(identifier)) {
-      throw new NoSuchTableException("Invalid identifier: %s", identifier);
+      return false;
     }
 
     String database = identifier.namespace().level(0);
@@ -126,7 +126,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
 
       return true;
 
-    } catch (NoSuchObjectException e) {
+    } catch (NoSuchTableException | NoSuchObjectException e) {
       return false;
 
     } catch (TException e) {
@@ -151,6 +151,8 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
 
     try {
       Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+      HiveTableOperations.validateTableIsIceberg(table, fromDatabase, fromName);
+
       table.setDbName(toDatabase);
       table.setTableName(to.name());
 
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 9871203..2b9c564 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotIcebergException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.thrift.TException;
@@ -89,19 +90,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
   protected void doRefresh() {
     String metadataLocation = null;
     try {
-      final Table table = metaClients.run(client -> client.getTable(database, tableName));
-      String tableType = table.getParameters().get(TABLE_TYPE_PROP);
-
-      if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
-        throw new IllegalArgumentException(String.format("Type of %s.%s is %s, not %s",
-            database, tableName,
-            tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */));
-      }
+      Table table = metaClients.run(client -> client.getTable(database, tableName));
+      validateTableIsIceberg(table, database, tableName);
 
       metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
       if (metadataLocation == null) {
-        String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
-        throw new IllegalArgumentException(errMsg);
+        throw new NotIcebergException("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
       }
 
     } catch (NoSuchObjectException e) {
@@ -269,4 +263,13 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       }
     }
   }
+
+  static void validateTableIsIceberg(Table table, String database, String tableName) {
+    String tableType = table.getParameters().get(TABLE_TYPE_PROP);
+    if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
+      throw new NotIcebergException("Type of %s.%s is %s, not %s",
+          database, tableName,
+          tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */);
+    }
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
new file mode 100644
index 0000000..ff9f0fe
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogExtension;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A Spark catalog that can also load non-Iceberg tables.
+ *
+ * @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
+ */
+public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
+    implements TableCatalog, SupportsNamespaces, CatalogExtension {
+
+  private String name = null;
+  private SparkCatalog icebergCatalog = null;
+  private T sessionCatalog = null;
+  private boolean createParquetAsIceberg = false;
+  private boolean createAvroAsIceberg = false;
+
+  /**
+   * Build a {@link SparkCatalog} to be used for Iceberg operations.
+   * <p>
+   * The default implementation creates a new SparkCatalog with the session catalog's name and options.
+   *
+   * @param catalogName catalog name
+   * @param options catalog options
+   * @return a SparkCatalog to be used for Iceberg tables
+   */
+  protected SparkCatalog buildSparkCatalog(String catalogName, CaseInsensitiveStringMap options) {
+    SparkCatalog newCatalog = new SparkCatalog();
+    newCatalog.initialize(catalogName, options);
+    return newCatalog;
+  }
+
+  @Override
+  public String[][] listNamespaces() throws NoSuchNamespaceException {
+    return sessionCatalog.listNamespaces();
+  }
+
+  @Override
+  public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+    return sessionCatalog.listNamespaces(namespace);
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
+    return sessionCatalog.loadNamespaceMetadata(namespace);
+  }
+
+  @Override
+  public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {
+    sessionCatalog.createNamespace(namespace, metadata);
+  }
+
+  @Override
+  public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {
+    sessionCatalog.alterNamespace(namespace, changes);
+  }
+
+  @Override
+  public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
+    return sessionCatalog.dropNamespace(namespace);
+  }
+
+  @Override
+  public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+    // delegate to the session catalog because all tables share the same namespace
+    return sessionCatalog.listTables(namespace);
+  }
+
+  @Override
+  public Table loadTable(Identifier ident) throws NoSuchTableException {
+    try {
+      return icebergCatalog.loadTable(ident);
+    } catch (NoSuchTableException e) {
+      return sessionCatalog.loadTable(ident);
+    }
+  }
+
+  @Override
+  public Table createTable(Identifier ident, StructType schema, Transform[] partitions,
+                           Map<String, String> properties)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+      return icebergCatalog.createTable(ident, schema, partitions, properties);
+
+    } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
+      ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
+      propertiesBuilder.putAll(properties);
+      propertiesBuilder.put(TableProperties.DEFAULT_FILE_FORMAT, "parquet");
+      return icebergCatalog.createTable(ident, schema, partitions, propertiesBuilder.build());
+
+    } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
+      ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
+      propertiesBuilder.putAll(properties);
+      propertiesBuilder.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+      return icebergCatalog.createTable(ident, schema, partitions, properties);
+
+    } else {
+      // delegate to the session catalog
+      return sessionCatalog.createTable(ident, schema, partitions, properties);
+    }
+  }
+
+  @Override
+  public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+    if (icebergCatalog.tableExists(ident)) {
+      return icebergCatalog.alterTable(ident, changes);
+    } else {
+      return sessionCatalog.alterTable(ident, changes);
+    }
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+    // required to return false.
+    return icebergCatalog.dropTable(ident) || sessionCatalog.dropTable(ident);
+  }
+
+  @Override
+  public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
+    // rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session catalog tables,
+    // check table existence first to ensure that the table belongs to the Iceberg catalog.
+    if (icebergCatalog.tableExists(from)) {
+      icebergCatalog.renameTable(from, to);
+    } else {
+      sessionCatalog.renameTable(from, to);
+    }
+  }
+
+  @Override
+  public final void initialize(String catalogName, CaseInsensitiveStringMap options) {
+    this.name = catalogName;
+    this.icebergCatalog = buildSparkCatalog(catalogName, options);
+    this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg);
+    this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setDelegateCatalog(CatalogPlugin sparkCatalog) {
+    if (sparkCatalog instanceof TableCatalog && sparkCatalog instanceof SupportsNamespaces) {
+      this.sessionCatalog = (T) sparkCatalog;
+    } else {
+      throw new IllegalArgumentException("Invalid session catalog: " + sparkCatalog);
+    }
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 30b8b6b..09fb304 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -119,4 +119,8 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
     return new SparkWriteBuilder(sparkSession(), icebergTable, options);
   }
 
+  @Override
+  public String toString() {
+    return icebergTable.toString();
+  }
 }