You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/11 01:19:48 UTC

[incubator-iceberg] 03/03: Add a SparkCatalog implementation.

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git

commit 6f42a9fe58a692ed571ca5b49a7537c5a03a0ae3
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Dec 10 12:41:31 2019 -0800

    Add a SparkCatalog implementation.
---
 .../org/apache/iceberg/hadoop/HadoopCatalog.java   |  18 +-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  19 +-
 .../org/apache/iceberg/spark/SparkCatalog.java     | 252 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |   8 +-
 .../iceberg/spark/source/StagedSparkTable.java     |  42 ++++
 5 files changed, 331 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
index 03108f5..ea9c833 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
@@ -59,24 +59,37 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
  */
 public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable {
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+  private final String name;
   private final Configuration conf;
   private String warehouseLocation;
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @param name the name of this catalog
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
-  public HadoopCatalog(Configuration conf, String warehouseLocation) {
+  public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
 
+    this.name = name;
     this.conf = conf;
     this.warehouseLocation = warehouseLocation.replaceAll("/*$", "");
   }
 
   /**
+   * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
+   *
+   * @param conf The Hadoop configuration
+   * @param warehouseLocation The location used as warehouse directory
+   */
+  public HadoopCatalog(Configuration conf, String warehouseLocation) {
+    this("hadoop", conf, warehouseLocation);
+  }
+
+  /**
    * The constructor of the HadoopCatalog. It gets the value of <code>fs.defaultFS</code> property
    * from the passed Hadoop configuration as its default file system, and use the default directory
    * <code>iceberg/warehouse</code> as the warehouse directory.
@@ -84,13 +97,14 @@ public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable {
    * @param conf The Hadoop configuration
    */
   public HadoopCatalog(Configuration conf) {
+    this.name = "hadoop";
     this.conf = conf;
     this.warehouseLocation = conf.get("fs.defaultFS") + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
   }
 
   @Override
   protected String name() {
-    return "hadoop";
+    return name;
   }
 
   @Override
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index ad9b05d..9213d7f 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -44,15 +44,26 @@ import org.slf4j.LoggerFactory;
 public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
-  private final HiveClientPool clients;
   private final Configuration conf;
+  private final HiveClientPool clients;
   private final StackTraceElement[] createStack;
+  private final String name;
   private boolean closed;
 
   public HiveCatalog(Configuration conf) {
-    this.clients = new HiveClientPool(2, conf);
-    this.conf = conf;
+    this("hive", null, 2, conf);
+  }
+
+  public HiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {
+    this.conf = new Configuration(conf);
+    // before building the client pool, overwrite the configuration's URIs if the argument is non-null
+    if (uri != null) {
+      this.conf.set("hive.metastore.uris", uri);
+    }
+
+    this.clients = new HiveClientPool(clientPoolSize, this.conf);
     this.createStack = Thread.currentThread().getStackTrace();
+    this.name = name;
     this.closed = false;
   }
 
@@ -82,7 +93,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
 
   @Override
   protected String name() {
-    return "hive";
+    return name;
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
new file mode 100644
index 0000000..cd81e8b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.spark.source.StagedSparkTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A Spark TableCatalog implementation that wraps Iceberg's {@link Catalog} interface.
+ */
+public class SparkCatalog implements StagingTableCatalog {
+  private String catalogName = null;
+  private Catalog icebergCatalog = null;
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter.
+   *
+   * @param name Spark's catalog name
+   * @param options Spark's catalog options
+   * @return an Iceberg catalog
+   */
+  protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
+    Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration();
+    String catalogType = options.getOrDefault("type", "hive");
+    switch (catalogType) {
+      case "hive":
+        int clientPoolSize = options.getInt("clients", 2);
+        String uri = options.get("uri");
+        return new HiveCatalog(name, uri, clientPoolSize, conf);
+
+      case "hadoop":
+        String warehouseLocation = options.get("warehouse");
+        return new HadoopCatalog(name, conf, warehouseLocation);
+
+      default:
+        throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+    }
+  }
+
+  /**
+   * Build an Iceberg {@link TableIdentifier} for the given Spark identifier.
+   *
+   * @param identifier Spark's identifier
+   * @return an Iceberg identifier
+   */
+  protected TableIdentifier buildIdentifier(Identifier identifier) {
+    return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
+  }
+
+  @Override
+  public Identifier[] listTables(String[] namespace) {
+    // TODO: handle namespaces
+    return new Identifier[0];
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+    try {
+      return new SparkTable(icebergCatalog.loadTable(buildIdentifier(ident)));
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+  }
+
+  @Override
+  public SparkTable createTable(Identifier ident, StructType schema, Transform[] transforms,
+                                Map<String, String> properties) throws TableAlreadyExistsException {
+    Schema icebergSchema = SparkSchemaUtil.convert(schema);
+    try {
+      return new SparkTable(icebergCatalog.createTable(buildIdentifier(ident),
+          icebergSchema, SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties));
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistsException(ident);
+    }
+  }
+
+  @Override
+  public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] transforms,
+                                 Map<String, String> properties) throws TableAlreadyExistsException {
+    Schema icebergSchema = SparkSchemaUtil.convert(schema);
+    try {
+      return new StagedSparkTable(icebergCatalog.newCreateTableTransaction(buildIdentifier(ident), icebergSchema,
+          SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties));
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistsException(ident);
+    }
+  }
+
+  @Override
+  public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] transforms,
+                                  Map<String, String> properties) throws NoSuchTableException {
+    Schema icebergSchema = SparkSchemaUtil.convert(schema);
+    try {
+      return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction(buildIdentifier(ident), icebergSchema,
+          SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties,
+          false /* do not create */));
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+  }
+
+  @Override
+  public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] transforms,
+                                          Map<String, String> properties) {
+    Schema icebergSchema = SparkSchemaUtil.convert(schema);
+    return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction(buildIdentifier(ident), icebergSchema,
+        SparkUtil.toPartitionSpec(icebergSchema, transforms), properties.get("location"), properties,
+        true /* create or replace */));
+  }
+
+  @Override
+  public SparkTable alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+    SetProperty setLocation = null;
+    SetProperty setSnapshotId = null;
+    List<TableChange> propertyChanges = Lists.newArrayList();
+    List<TableChange> schemaChanges = Lists.newArrayList();
+
+    for (TableChange change : changes) {
+      if (change instanceof SetProperty) {
+        SetProperty set = (SetProperty) change;
+        if ("location".equalsIgnoreCase(set.property())) {
+          setLocation = set;
+        } else if ("current-snapshot-id".equalsIgnoreCase(set.property())) {
+          setSnapshotId = set;
+        } else {
+          propertyChanges.add(set);
+        }
+      } else if (change instanceof RemoveProperty) {
+        propertyChanges.add(change);
+      } else {
+        schemaChanges.add(change);
+      }
+    }
+
+    try {
+      Table table = icebergCatalog.loadTable(buildIdentifier(ident));
+
+      if (setSnapshotId != null) {
+        long newSnapshotId = Long.parseLong(setSnapshotId.value());
+        table.rollback().toSnapshotId(newSnapshotId).commit();
+      }
+
+      Transaction transaction = table.newTransaction();
+
+      if (setLocation != null) {
+        transaction.updateLocation()
+            .setLocation(setLocation.value())
+            .commit();
+      }
+
+      if (!propertyChanges.isEmpty()) {
+        SparkUtil.applyPropertyChanges(transaction.updateProperties(), propertyChanges).commit();
+      }
+
+      if (!schemaChanges.isEmpty()) {
+        SparkUtil.applySchemaChanges(transaction.updateSchema(), schemaChanges).commit();
+      }
+
+      transaction.commitTransaction();
+
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    try {
+      return icebergCatalog.dropTable(buildIdentifier(ident), true);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
+    try {
+      icebergCatalog.renameTable(buildIdentifier(from), buildIdentifier(to));
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(from);
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistsException(to);
+    }
+  }
+
+  @Override
+  public void invalidateTable(Identifier ident) {
+    try {
+      icebergCatalog.loadTable(buildIdentifier(ident)).refresh();
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
+      // ignore if the table doesn't exist, it is not cached
+    }
+  }
+
+  @Override
+  final public void initialize(String name, CaseInsensitiveStringMap options) {
+    boolean cacheEnabled = Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+    Catalog catalog = buildIcebergCatalog(name, options);
+
+    this.catalogName = name;
+    this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog;
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index f85aac2..30b8b6b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
 
   private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
       TableCapability.BATCH_READ,
@@ -49,7 +49,11 @@ class SparkTable implements org.apache.spark.sql.connector.catalog.Table, Suppor
   private StructType lazyTableSchema = null;
   private SparkSession lazySpark = null;
 
-  SparkTable(Table icebergTable, StructType requestedSchema) {
+  public SparkTable(Table icebergTable) {
+    this(icebergTable, null);
+  }
+
+  public SparkTable(Table icebergTable, StructType requestedSchema) {
     this.icebergTable = icebergTable;
     this.requestedSchema = requestedSchema;
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java
new file mode 100644
index 0000000..d427621
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Transaction;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+
+public class StagedSparkTable extends SparkTable implements StagedTable {
+  private final Transaction transaction;
+
+  public StagedSparkTable(Transaction transaction) {
+    super(transaction.table());
+    this.transaction = transaction;
+  }
+
+  @Override
+  public void commitStagedChanges() {
+    transaction.commitTransaction();
+  }
+
+  @Override
+  public void abortStagedChanges() {
+    // TODO: clean up
+  }
+}