You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/11 01:19:48 UTC
[incubator-iceberg] 03/03: Add a SparkCatalog implementation.
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
commit 6f42a9fe58a692ed571ca5b49a7537c5a03a0ae3
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Dec 10 12:41:31 2019 -0800
Add a SparkCatalog implementation.
---
.../org/apache/iceberg/hadoop/HadoopCatalog.java | 18 +-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 19 +-
.../org/apache/iceberg/spark/SparkCatalog.java | 252 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 8 +-
.../iceberg/spark/source/StagedSparkTable.java | 42 ++++
5 files changed, 331 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
index 03108f5..ea9c833 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
@@ -59,24 +59,37 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
*/
public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable {
private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+ private final String name;
private final Configuration conf;
private String warehouseLocation;
/**
* The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
*
+ * @param name the name of this catalog
* @param conf The Hadoop configuration
* @param warehouseLocation The location used as warehouse directory
*/
- public HadoopCatalog(Configuration conf, String warehouseLocation) {
+ public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
"no location provided for warehouse");
+ this.name = name;
this.conf = conf;
this.warehouseLocation = warehouseLocation.replaceAll("/*$", "");
}
/**
+ * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
+ *
+ * @param conf The Hadoop configuration
+ * @param warehouseLocation The location used as warehouse directory
+ */
+ public HadoopCatalog(Configuration conf, String warehouseLocation) {
+ this("hadoop", conf, warehouseLocation);
+ }
+
+ /**
* The constructor of the HadoopCatalog. It gets the value of <code>fs.defaultFS</code> property
* from the passed Hadoop configuration as its default file system, and use the default directory
* <code>iceberg/warehouse</code> as the warehouse directory.
@@ -84,13 +97,14 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable {
* @param conf The Hadoop configuration
*/
public HadoopCatalog(Configuration conf) {
+ this.name = "hadoop";
this.conf = conf;
this.warehouseLocation = conf.get("fs.defaultFS") + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
}
@Override
protected String name() {
- return "hadoop";
+ return name;
}
@Override
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index ad9b05d..9213d7f 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -44,15 +44,26 @@ import org.slf4j.LoggerFactory;
public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
- private final HiveClientPool clients;
private final Configuration conf;
+ private final HiveClientPool clients;
private final StackTraceElement[] createStack;
+ private final String name;
private boolean closed;
public HiveCatalog(Configuration conf) {
- this.clients = new HiveClientPool(2, conf);
- this.conf = conf;
+ this("hive", null, 2, conf);
+ }
+
+ public HiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {
+ this.conf = new Configuration(conf);
+ // before building the client pool, overwrite the configuration's URIs if the argument is non-null
+ if (uri != null) {
+ this.conf.set("hive.metastore.uris", uri);
+ }
+
+ this.clients = new HiveClientPool(clientPoolSize, this.conf);
this.createStack = Thread.currentThread().getStackTrace();
+ this.name = name;
this.closed = false;
}
@@ -82,7 +93,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
@Override
protected String name() {
- return "hive";
+ return name;
}
@Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
new file mode 100644
index 0000000..cd81e8b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A Spark TableCatalog implementation that wraps Iceberg's {@link Catalog} interface.
+ */
+public class SparkCatalog implements StagingTableCatalog {
+ private String catalogName = null;
+ private Catalog icebergCatalog = null;
+
+ /**
+ * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter.
+ *
+ * @param name Spark's catalog name
+ * @param options Spark's catalog options
+ * @return an Iceberg catalog
+ */
+ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
+ Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration();
+ String catalogType = options.getOrDefault("type", "hive");
+ switch (catalogType) {
+ case "hive":
+ int clientPoolSize = options.getInt("clients", 2);
+ String uri = options.get("uri");
+ return new HiveCatalog(name, uri, clientPoolSize, conf);
+
+ case "hadoop":
+ String warehouseLocation = options.get("warehouse");
+ return new HadoopCatalog(name, conf, warehouseLocation);
+
+ default:
+ throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+ }
+ }
+
+ /**
+ * Build an Iceberg {@link TableIdentifier} for the given Spark identifier.
+ *
+ * @param identifier Spark's identifier
+ * @return an Iceberg identifier
+ */
+ protected TableIdentifier buildIdentifier(Identifier identifier) {
+ return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ // TODO: handle namespaces
+ return new Identifier[0];
+ }
+
+ @Override
+ public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+ try {
+ return new SparkTable(icebergCatalog.loadTable(buildIdentifier(ident)));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+
+ @Override
+ public SparkTable createTable(Identifier ident, StructType schema, Transform[] transforms,
+ Map<String, String> properties) throws TableAlreadyExistsException {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+ try {
+ return new SparkTable(icebergCatalog.createTable(buildIdentifier(ident),
+ icebergSchema, SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties));
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistsException(ident);
+ }
+ }
+
+ @Override
+ public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] transforms,
+ Map<String, String> properties) throws TableAlreadyExistsException {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+ try {
+ return new StagedSparkTable(icebergCatalog.newCreateTableTransaction(buildIdentifier(ident), icebergSchema,
+ SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties));
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistsException(ident);
+ }
+ }
+
+ @Override
+ public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] transforms,
+ Map<String, String> properties) throws NoSuchTableException {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+ try {
+ return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction(buildIdentifier(ident), icebergSchema,
+ SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties,
+ false /* do not create */));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] transforms,
+ Map<String, String> properties) {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+ return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction(buildIdentifier(ident), icebergSchema,
+ SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties,
+ true /* create or replace */));
+ }
+
+ @Override
+ public SparkTable alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+ SetProperty setLocation = null;
+ SetProperty setSnapshotId = null;
+ List<TableChange> propertyChanges = Lists.newArrayList();
+ List<TableChange> schemaChanges = Lists.newArrayList();
+
+ for (TableChange change : changes) {
+ if (change instanceof SetProperty) {
+ SetProperty set = (SetProperty) change;
+ if ("location".equalsIgnoreCase(set.property())) {
+ setLocation = set;
+ } else if ("current-snapshot-id".equalsIgnoreCase(set.property())) {
+ setSnapshotId = set;
+ } else {
+ propertyChanges.add(set);
+ }
+ } else if (change instanceof RemoveProperty) {
+ propertyChanges.add(change);
+ } else {
+ schemaChanges.add(change);
+ }
+ }
+
+ try {
+ Table table = icebergCatalog.loadTable(buildIdentifier(ident));
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId.value());
+ table.rollback().toSnapshotId(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation()
+ .setLocation(setLocation.value())
+ .commit();
+ }
+
+ if (!propertyChanges.isEmpty()) {
+ SparkUtil.applyPropertyChanges(transaction.updateProperties(), propertyChanges).commit();
+ }
+
+ if (!schemaChanges.isEmpty()) {
+ SparkUtil.applySchemaChanges(transaction.updateSchema(), schemaChanges).commit();
+ }
+
+ transaction.commitTransaction();
+
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(ident);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ try {
+ return icebergCatalog.dropTable(buildIdentifier(ident), true);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
+ try {
+ icebergCatalog.renameTable(buildIdentifier(from), buildIdentifier(to));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(from);
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistsException(to);
+ }
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ try {
+ icebergCatalog.loadTable(buildIdentifier(ident)).refresh();
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
+ // ignore if the table doesn't exist, it is not cached
+ }
+ }
+
+ @Override
+ final public void initialize(String name, CaseInsensitiveStringMap options) {
+ boolean cacheEnabled = Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+ Catalog catalog = buildIcebergCatalog(name, options);
+
+ this.catalogName = name;
+ this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog;
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index f85aac2..30b8b6b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
TableCapability.BATCH_READ,
@@ -49,7 +49,11 @@ class SparkTable implements org.apache.spark.sql.connector.catalog.Table, Suppor
private StructType lazyTableSchema = null;
private SparkSession lazySpark = null;
- SparkTable(Table icebergTable, StructType requestedSchema) {
+ public SparkTable(Table icebergTable) {
+ this(icebergTable, null);
+ }
+
+ public SparkTable(Table icebergTable, StructType requestedSchema) {
this.icebergTable = icebergTable;
this.requestedSchema = requestedSchema;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java
new file mode 100644
index 0000000..d427621
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Transaction;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+
+public class StagedSparkTable extends SparkTable implements StagedTable {
+ private final Transaction transaction;
+
+ public StagedSparkTable(Transaction transaction) {
+ super(transaction.table());
+ this.transaction = transaction;
+ }
+
+ @Override
+ public void commitStagedChanges() {
+ transaction.commitTransaction();
+ }
+
+ @Override
+ public void abortStagedChanges() {
+ // TODO: clean up
+ }
+}