You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/23 17:07:43 UTC
[incubator-iceberg] branch spark-3 updated: Add session catalog for
Spark 3.0 (#705)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/spark-3 by this push:
new c50ee1e Add session catalog for Spark 3.0 (#705)
c50ee1e is described below
commit c50ee1e6c84b4f444a300737ecae905a2bd7cf1c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Dec 23 09:07:33 2019 -0800
Add session catalog for Spark 3.0 (#705)
---
.../iceberg/exceptions/NotIcebergException.java | 39 +++++
.../java/org/apache/iceberg/hive/HiveCatalog.java | 6 +-
.../apache/iceberg/hive/HiveTableOperations.java | 23 +--
.../apache/iceberg/spark/SparkSessionCatalog.java | 190 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 4 +
5 files changed, 250 insertions(+), 12 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java b/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java
new file mode 100644
index 0000000..6066d5d
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/NotIcebergException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+/**
+ * Exception raised when a table is found but is not an Iceberg table.
+ */
+public class NotIcebergException extends NoSuchTableException {
+ public NotIcebergException(String message, Object... args) {
+ super(message, args);
+ }
+
+ public NotIcebergException(Throwable cause, String message, Object... args) {
+ super(cause, message, args);
+ }
+
+ public static void check(boolean test, String message, Object... args) {
+ if (!test) {
+ throw new NotIcebergException(message, args);
+ }
+ }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 9213d7f..fb316f7 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -99,7 +99,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
if (!isValidIdentifier(identifier)) {
- throw new NoSuchTableException("Invalid identifier: %s", identifier);
+ return false;
}
String database = identifier.namespace().level(0);
@@ -126,7 +126,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
return true;
- } catch (NoSuchObjectException e) {
+ } catch (NoSuchTableException | NoSuchObjectException e) {
return false;
} catch (TException e) {
@@ -151,6 +151,8 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
try {
Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+ HiveTableOperations.validateTableIsIceberg(table, fromDatabase, fromName);
+
table.setDbName(toDatabase);
table.setTableName(to.name());
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 9871203..2b9c564 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotIcebergException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.thrift.TException;
@@ -89,19 +90,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
protected void doRefresh() {
String metadataLocation = null;
try {
- final Table table = metaClients.run(client -> client.getTable(database, tableName));
- String tableType = table.getParameters().get(TABLE_TYPE_PROP);
-
- if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
- throw new IllegalArgumentException(String.format("Type of %s.%s is %s, not %s",
- database, tableName,
- tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */));
- }
+ Table table = metaClients.run(client -> client.getTable(database, tableName));
+ validateTableIsIceberg(table, database, tableName);
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
if (metadataLocation == null) {
- String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
- throw new IllegalArgumentException(errMsg);
+ throw new NotIcebergException("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
}
} catch (NoSuchObjectException e) {
@@ -269,4 +263,13 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
}
}
}
+
+ static void validateTableIsIceberg(Table table, String database, String tableName) {
+ String tableType = table.getParameters().get(TABLE_TYPE_PROP);
+ if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
+ throw new NotIcebergException("Type of %s.%s is %s, not %s",
+ database, tableName,
+ tableType /* actual type */, ICEBERG_TABLE_TYPE_VALUE /* expected type */);
+ }
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
new file mode 100644
index 0000000..ff9f0fe
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.iceberg.TableProperties;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogExtension;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A Spark catalog that can also load non-Iceberg tables.
+ *
+ * @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
+ */
+public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
+ implements TableCatalog, SupportsNamespaces, CatalogExtension {
+
+ private String name = null;
+ private SparkCatalog icebergCatalog = null;
+ private T sessionCatalog = null;
+ private boolean createParquetAsIceberg = false;
+ private boolean createAvroAsIceberg = false;
+
+ /**
+ * Build a {@link SparkCatalog} to be used for Iceberg operations.
+ * <p>
+ * The default implementation creates a new SparkCatalog with the session catalog's name and options.
+ *
+ * @param catalogName catalog name
+ * @param options catalog options
+ * @return a SparkCatalog to be used for Iceberg tables
+ */
+ protected SparkCatalog buildSparkCatalog(String catalogName, CaseInsensitiveStringMap options) {
+ SparkCatalog newCatalog = new SparkCatalog();
+ newCatalog.initialize(catalogName, options);
+ return newCatalog;
+ }
+
+ @Override
+ public String[][] listNamespaces() throws NoSuchNamespaceException {
+ return sessionCatalog.listNamespaces();
+ }
+
+ @Override
+ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+ return sessionCatalog.listNamespaces(namespace);
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
+ return sessionCatalog.loadNamespaceMetadata(namespace);
+ }
+
+ @Override
+ public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {
+ sessionCatalog.createNamespace(namespace, metadata);
+ }
+
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {
+ sessionCatalog.alterNamespace(namespace, changes);
+ }
+
+ @Override
+ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
+ return sessionCatalog.dropNamespace(namespace);
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+ // delegate to the session catalog because all tables share the same namespace
+ return sessionCatalog.listTables(namespace);
+ }
+
+ @Override
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
+ try {
+ return icebergCatalog.loadTable(ident);
+ } catch (NoSuchTableException e) {
+ return sessionCatalog.loadTable(ident);
+ }
+ }
+
+ @Override
+ public Table createTable(Identifier ident, StructType schema, Transform[] partitions,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ String provider = properties.get("provider");
+ if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+ return icebergCatalog.createTable(ident, schema, partitions, properties);
+
+ } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
+ ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
+ propertiesBuilder.putAll(properties);
+ propertiesBuilder.put(TableProperties.DEFAULT_FILE_FORMAT, "parquet");
+ return icebergCatalog.createTable(ident, schema, partitions, propertiesBuilder.build());
+
+ } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
+ ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
+ propertiesBuilder.putAll(properties);
+ propertiesBuilder.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+ return icebergCatalog.createTable(ident, schema, partitions, properties);
+
+ } else {
+ // delegate to the session catalog
+ return sessionCatalog.createTable(ident, schema, partitions, properties);
+ }
+ }
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+ if (icebergCatalog.tableExists(ident)) {
+ return icebergCatalog.alterTable(ident, changes);
+ } else {
+ return sessionCatalog.alterTable(ident, changes);
+ }
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+ // required to return false.
+ return icebergCatalog.dropTable(ident) || sessionCatalog.dropTable(ident);
+ }
+
+ @Override
+ public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
+ // rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session catalog tables,
+ // check table existence first to ensure that the table belongs to the Iceberg catalog.
+ if (icebergCatalog.tableExists(from)) {
+ icebergCatalog.renameTable(from, to);
+ } else {
+ sessionCatalog.renameTable(from, to);
+ }
+ }
+
+ @Override
+ public final void initialize(String catalogName, CaseInsensitiveStringMap options) {
+ this.name = catalogName;
+ this.icebergCatalog = buildSparkCatalog(catalogName, options);
+ this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg);
+ this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setDelegateCatalog(CatalogPlugin sparkCatalog) {
+ if (sparkCatalog instanceof TableCatalog && sparkCatalog instanceof SupportsNamespaces) {
+ this.sessionCatalog = (T) sparkCatalog;
+ } else {
+ throw new IllegalArgumentException("Invalid session catalog: " + sparkCatalog);
+ }
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 30b8b6b..09fb304 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -119,4 +119,8 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
return new SparkWriteBuilder(sparkSession(), icebergTable, options);
}
+ @Override
+ public String toString() {
+ return icebergTable.toString();
+ }
}