You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2019/11/20 09:30:26 UTC
[incubator-iceberg] branch master updated: Add CachingCatalog (#653)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7b8e21f Add CachingCatalog (#653)
7b8e21f is described below
commit 7b8e21fd6469063e9c4d68d945e67f56a572af80
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Nov 20 01:30:18 2019 -0800
Add CachingCatalog (#653)
---
.../java/org/apache/iceberg/catalog/Catalog.java | 3 +-
.../java/org/apache/iceberg/catalog/Namespace.java | 20 +++++
.../apache/iceberg/catalog/TableIdentifier.java | 20 +++++
.../java/org/apache/iceberg/CachingCatalog.java | 100 +++++++++++++++++++++
.../apache/iceberg/CommitCallbackTransaction.java | 100 +++++++++++++++++++++
5 files changed, 242 insertions(+), 1 deletion(-)
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index edaa2d6..46476bf 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -290,7 +290,8 @@ public interface Catalog {
*
* @param from identifier of the table to rename
* @param to new table name
- * @throws NoSuchTableException if the table does not exist
+ * @throws NoSuchTableException if the from table does not exist
+ * @throws AlreadyExistsException if the to table already exists
*/
void renameTable(TableIdentifier from, TableIdentifier to);
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
index 739ede3..2202d5f 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.catalog;
import com.google.common.base.Joiner;
+import java.util.Arrays;
/**
* A namespace in a {@link Catalog}.
@@ -59,6 +60,25 @@ public class Namespace {
}
@Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ Namespace namespace = (Namespace) other;
+ return Arrays.equals(levels, namespace.levels);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(levels);
+ }
+
+ @Override
public String toString() {
return DOT.join(levels);
}
diff --git a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
index bd2e213..0e4d898 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import java.util.Arrays;
+import java.util.Objects;
/**
* Identifies a table in iceberg catalog.
@@ -77,6 +78,25 @@ public class TableIdentifier {
}
@Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ TableIdentifier that = (TableIdentifier) other;
+ return namespace.equals(that.namespace) && name.equals(that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namespace, name);
+ }
+
+ @Override
public String toString() {
return namespace.toString() + "." + name;
}
diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
new file mode 100644
index 0000000..8b2e483
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+
+public class CachingCatalog implements Catalog {
+
+ public static Catalog wrap(Catalog catalog) {
+ return new CachingCatalog(catalog);
+ }
+
+ private final Cache<TableIdentifier, Table> tableCache = Caffeine.newBuilder()
+ .softValues()
+ .expireAfterAccess(1, TimeUnit.MINUTES)
+ .build();
+ private final Catalog catalog;
+
+ private CachingCatalog(Catalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public Table loadTable(TableIdentifier ident) {
+ return tableCache.get(ident, catalog::loadTable);
+ }
+
+ @Override
+ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location,
+ Map<String, String> properties) {
+ AtomicBoolean created = new AtomicBoolean(false);
+ Table table = tableCache.get(ident, identifier -> {
+ created.set(true);
+ return catalog.createTable(identifier, schema, spec, location, properties);
+ });
+
+ if (!created.get()) {
+ throw new AlreadyExistsException("Table already exists: %s", ident);
+ }
+
+ return table;
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ String location, Map<String, String> properties) {
+ // create a new transaction without altering the cache. the table doesn't exist until the transaction is committed.
+ // if the table is created before the transaction commits, any cached version is correct and the transaction create
+ // will fail. if the transaction commits before another create, then the cache will be empty.
+ return catalog.newCreateTableTransaction(ident, schema, spec, location, properties);
+ }
+
+ @Override
+ public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ String location, Map<String, String> properties, boolean orCreate) {
+ // create a new transaction without altering the cache. the table doesn't change until the transaction is committed.
+ // when the transaction commits, invalidate the table in the cache if it is present.
+ return CommitCallbackTransaction.addCallback(
+ catalog.newReplaceTableTransaction(ident, schema, spec, location, properties, orCreate),
+ () -> tableCache.invalidate(ident));
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier ident, boolean purge) {
+ boolean dropped = catalog.dropTable(ident, false);
+ tableCache.invalidate(ident);
+ return dropped;
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ catalog.renameTable(from, to);
+ tableCache.invalidate(from);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java
new file mode 100644
index 0000000..8063818
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+class CommitCallbackTransaction implements Transaction {
+ static Transaction addCallback(Transaction txn, Runnable callback) {
+ return new CommitCallbackTransaction(txn, callback);
+ }
+
+ private final Transaction wrapped;
+ private final Runnable callback;
+
+ private CommitCallbackTransaction(Transaction wrapped, Runnable callback) {
+ this.wrapped = wrapped;
+ this.callback = callback;
+ }
+
+ @Override
+ public Table table() {
+ return wrapped.table();
+ }
+
+ @Override
+ public UpdateSchema updateSchema() {
+ return wrapped.updateSchema();
+ }
+
+ @Override
+ public UpdateProperties updateProperties() {
+ return wrapped.updateProperties();
+ }
+
+ @Override
+ public UpdateLocation updateLocation() {
+ return wrapped.updateLocation();
+ }
+
+ @Override
+ public AppendFiles newFastAppend() {
+ return wrapped.newFastAppend();
+ }
+
+ @Override
+ public AppendFiles newAppend() {
+ return wrapped.newAppend();
+ }
+
+ @Override
+ public RewriteFiles newRewrite() {
+ return wrapped.newRewrite();
+ }
+
+ @Override
+ public RewriteManifests rewriteManifests() {
+ return wrapped.rewriteManifests();
+ }
+
+ @Override
+ public OverwriteFiles newOverwrite() {
+ return wrapped.newOverwrite();
+ }
+
+ @Override
+ public ReplacePartitions newReplacePartitions() {
+ return wrapped.newReplacePartitions();
+ }
+
+ @Override
+ public DeleteFiles newDelete() {
+ return wrapped.newDelete();
+ }
+
+ @Override
+ public ExpireSnapshots expireSnapshots() {
+ return wrapped.expireSnapshots();
+ }
+
+ @Override
+ public void commitTransaction() {
+ wrapped.commitTransaction();
+ callback.run();
+ }
+}