You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2019/11/20 09:30:26 UTC

[incubator-iceberg] branch master updated: Add CachingCatalog (#653)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b8e21f  Add CachingCatalog (#653)
7b8e21f is described below

commit 7b8e21fd6469063e9c4d68d945e67f56a572af80
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Nov 20 01:30:18 2019 -0800

    Add CachingCatalog (#653)
---
 .../java/org/apache/iceberg/catalog/Catalog.java   |   3 +-
 .../java/org/apache/iceberg/catalog/Namespace.java |  20 +++++
 .../apache/iceberg/catalog/TableIdentifier.java    |  20 +++++
 .../java/org/apache/iceberg/CachingCatalog.java    | 100 +++++++++++++++++++++
 .../apache/iceberg/CommitCallbackTransaction.java  | 100 +++++++++++++++++++++
 5 files changed, 242 insertions(+), 1 deletion(-)

diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index edaa2d6..46476bf 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -290,7 +290,8 @@ public interface Catalog {
    *
    * @param from identifier of the table to rename
    * @param to new table name
-   * @throws NoSuchTableException if the table does not exist
+   * @throws NoSuchTableException if the from table does not exist
+   * @throws AlreadyExistsException if the to table already exists
    */
   void renameTable(TableIdentifier from, TableIdentifier to);
 
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
index 739ede3..2202d5f 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.catalog;
 
 import com.google.common.base.Joiner;
+import java.util.Arrays;
 
 /**
  * A namespace in a {@link Catalog}.
@@ -59,6 +60,25 @@ public class Namespace {
   }
 
   @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    Namespace namespace = (Namespace) other;
+    return Arrays.equals(levels, namespace.levels);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(levels);
+  }
+
+  @Override
   public String toString() {
     return DOT.join(levels);
   }
diff --git a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
index bd2e213..0e4d898 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * Identifies a table in iceberg catalog.
@@ -77,6 +78,25 @@ public class TableIdentifier {
   }
 
   @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    TableIdentifier that = (TableIdentifier) other;
+    return namespace.equals(that.namespace) && name.equals(that.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(namespace, name);
+  }
+
+  @Override
   public String toString() {
     return namespace.toString() + "." + name;
   }
diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
new file mode 100644
index 0000000..8b2e483
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+
+public class CachingCatalog implements Catalog {
+
+  public static Catalog wrap(Catalog catalog) {
+    return new CachingCatalog(catalog);
+  }
+
+  private final Cache<TableIdentifier, Table> tableCache = Caffeine.newBuilder()
+      .softValues()
+      .expireAfterAccess(1, TimeUnit.MINUTES)
+      .build();
+  private final Catalog catalog;
+
+  private CachingCatalog(Catalog catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier ident) {
+    return tableCache.get(ident, catalog::loadTable);
+  }
+
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location,
+                           Map<String, String> properties) {
+    AtomicBoolean created = new AtomicBoolean(false);
+    Table table = tableCache.get(ident, identifier -> {
+      created.set(true);
+      return catalog.createTable(identifier, schema, spec, location, properties);
+    });
+
+    if (!created.get()) {
+      throw new AlreadyExistsException("Table already exists: %s", ident);
+    }
+
+    return table;
+  }
+
+  @Override
+  public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                               String location, Map<String, String> properties) {
+    // create a new transaction without altering the cache. the table doesn't exist until the transaction is committed.
+    // if the table is created before the transaction commits, any cached version is correct and the transaction create
+    // will fail. if the transaction commits before another create, then the cache will be empty.
+    return catalog.newCreateTableTransaction(ident, schema, spec, location, properties);
+  }
+
+  @Override
+  public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                                String location, Map<String, String> properties, boolean orCreate) {
+    // create a new transaction without altering the cache. the table doesn't change until the transaction is committed.
+    // when the transaction commits, invalidate the table in the cache if it is present.
+    return CommitCallbackTransaction.addCallback(
+        catalog.newReplaceTableTransaction(ident, schema, spec, location, properties, orCreate),
+        () -> tableCache.invalidate(ident));
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier ident, boolean purge) {
+    boolean dropped = catalog.dropTable(ident, false);
+    tableCache.invalidate(ident);
+    return dropped;
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    catalog.renameTable(from, to);
+    tableCache.invalidate(from);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java
new file mode 100644
index 0000000..8063818
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+class CommitCallbackTransaction implements Transaction {
+  static Transaction addCallback(Transaction txn, Runnable callback) {
+    return new CommitCallbackTransaction(txn, callback);
+  }
+
+  private final Transaction wrapped;
+  private final Runnable callback;
+
+  private CommitCallbackTransaction(Transaction wrapped, Runnable callback) {
+    this.wrapped = wrapped;
+    this.callback = callback;
+  }
+
+  @Override
+  public Table table() {
+    return wrapped.table();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    return wrapped.updateSchema();
+  }
+
+  @Override
+  public UpdateProperties updateProperties() {
+    return wrapped.updateProperties();
+  }
+
+  @Override
+  public UpdateLocation updateLocation() {
+    return wrapped.updateLocation();
+  }
+
+  @Override
+  public AppendFiles newFastAppend() {
+    return wrapped.newFastAppend();
+  }
+
+  @Override
+  public AppendFiles newAppend() {
+    return wrapped.newAppend();
+  }
+
+  @Override
+  public RewriteFiles newRewrite() {
+    return wrapped.newRewrite();
+  }
+
+  @Override
+  public RewriteManifests rewriteManifests() {
+    return wrapped.rewriteManifests();
+  }
+
+  @Override
+  public OverwriteFiles newOverwrite() {
+    return wrapped.newOverwrite();
+  }
+
+  @Override
+  public ReplacePartitions newReplacePartitions() {
+    return wrapped.newReplacePartitions();
+  }
+
+  @Override
+  public DeleteFiles newDelete() {
+    return wrapped.newDelete();
+  }
+
+  @Override
+  public ExpireSnapshots expireSnapshots() {
+    return wrapped.expireSnapshots();
+  }
+
+  @Override
+  public void commitTransaction() {
+    wrapped.commitTransaction();
+    callback.run();
+  }
+}