You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/04/11 11:04:49 UTC
[iceberg] branch master updated: Nessie: Use latest hash for catalog APIs (#6789)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new af4ddd2547 Nessie: Use latest hash for catalog APIs (#6789)
af4ddd2547 is described below
commit af4ddd25475ff049c451bec3c567ca89353da87d
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Tue Apr 11 16:34:39 2023 +0530
Nessie: Use latest hash for catalog APIs (#6789)
* Nessie: Handle refresh for catalog APIs that doesn't use table operations
* Add commit testcase
* Another test case
* Address comments
* Avoid hash roundtrip
* Address new comments
* refactor
---
.../apache/iceberg/nessie/NessieIcebergClient.java | 75 +++++-----
.../apache/iceberg/nessie/UpdateableReference.java | 15 +-
.../apache/iceberg/nessie/TestMultipleClients.java | 160 +++++++++++++++++++++
3 files changed, 207 insertions(+), 43 deletions(-)
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index 5256b37d0c..8b17194acb 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.util.Tasks;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
import org.projectnessie.client.api.NessieApiV1;
+import org.projectnessie.client.api.OnReferenceBuilder;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
@@ -132,7 +133,7 @@ public class NessieIcebergClient implements AutoCloseable {
public List<TableIdentifier> listTables(Namespace namespace) {
try {
- return api.getEntries().reference(getRef().getReference()).get().getEntries().stream()
+ return withReference(api.getEntries()).get().getEntries().stream()
.filter(namespacePredicate(namespace))
.filter(e -> Content.Type.ICEBERG_TABLE == e.getType())
.map(this::toIdentifier)
@@ -168,7 +169,7 @@ public class NessieIcebergClient implements AutoCloseable {
public IcebergTable table(TableIdentifier tableIdentifier) {
try {
ContentKey key = NessieUtil.toKey(tableIdentifier);
- Content table = api.getContent().key(key).reference(getRef().getReference()).get().get(key);
+ Content table = withReference(api.getContent().key(key)).get().get(key);
return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
} catch (NessieNotFoundException e) {
return null;
@@ -178,11 +179,11 @@ public class NessieIcebergClient implements AutoCloseable {
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
try {
getRef().checkMutable();
- getApi()
- .createNamespace()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .properties(metadata)
+ withReference(
+ getApi()
+ .createNamespace()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ .properties(metadata))
.create();
refresh();
} catch (NessieNamespaceAlreadyExistsException e) {
@@ -199,10 +200,10 @@ public class NessieIcebergClient implements AutoCloseable {
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
try {
GetNamespacesResponse response =
- getApi()
- .getMultipleNamespaces()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ withReference(
+ getApi()
+ .getMultipleNamespaces()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
.get();
return response.getNamespaces().stream()
.map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
@@ -220,10 +221,10 @@ public class NessieIcebergClient implements AutoCloseable {
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
try {
getRef().checkMutable();
- getApi()
- .deleteNamespace()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ withReference(
+ getApi()
+ .deleteNamespace()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
.delete();
refresh();
return true;
@@ -245,10 +246,10 @@ public class NessieIcebergClient implements AutoCloseable {
public Map<String, String> loadNamespaceMetadata(Namespace namespace)
throws NoSuchNamespaceException {
try {
- return getApi()
- .getNamespace()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ return withReference(
+ getApi()
+ .getNamespace()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
.get()
.getProperties();
} catch (NessieNamespaceNotFoundException e) {
@@ -264,11 +265,11 @@ public class NessieIcebergClient implements AutoCloseable {
public boolean setProperties(Namespace namespace, Map<String, String> properties) {
try {
- getApi()
- .updateProperties()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .updateProperties(properties)
+ withReference(
+ getApi()
+ .updateProperties()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ .updateProperties(properties))
.update();
refresh();
// always successful, otherwise an exception is thrown
@@ -286,11 +287,11 @@ public class NessieIcebergClient implements AutoCloseable {
public boolean removeProperties(Namespace namespace, Set<String> properties) {
try {
- getApi()
- .updateProperties()
- .reference(getRef().getReference())
- .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .removeProperties(properties)
+ withReference(
+ getApi()
+ .updateProperties()
+ .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+ .removeProperties(properties))
.update();
refresh();
// always successful, otherwise an exception is thrown
@@ -336,7 +337,7 @@ public class NessieIcebergClient implements AutoCloseable {
.onFailure((o, exception) -> refresh())
.run(
ops -> {
- Branch branch = ops.branch(getRef().getAsBranch()).commit();
+ Branch branch = ops.branch((Branch) getRef().getReference()).commit();
getRef().updateReference(branch);
},
BaseNessieClientServerException.class);
@@ -403,7 +404,7 @@ public class NessieIcebergClient implements AutoCloseable {
.onFailure((o, exception) -> refresh())
.run(
commitBuilder -> {
- Branch branch = commitBuilder.branch(getRef().getAsBranch()).commit();
+ Branch branch = commitBuilder.branch((Branch) getRef().getReference()).commit();
getRef().updateReference(branch);
},
BaseNessieClientServerException.class);
@@ -432,7 +433,7 @@ public class NessieIcebergClient implements AutoCloseable {
updateableReference.checkMutable();
- Branch current = updateableReference.getAsBranch();
+ Branch current = (Branch) updateableReference.getReference();
Branch expectedHead = current;
if (base != null) {
String metadataCommitId =
@@ -492,6 +493,16 @@ public class NessieIcebergClient implements AutoCloseable {
|| snapshot.snapshotId() != base.currentSnapshot().snapshotId());
}
+ private <T extends OnReferenceBuilder<?>> T withReference(T builder) {
+ UpdateableReference ref = getRef();
+ if (!ref.isMutable()) {
+ builder.reference(ref.getReference());
+ } else {
+ builder.refName(ref.getName());
+ }
+ return builder;
+ }
+
private String buildCommitMsg(TableMetadata base, TableMetadata metadata, String tableName) {
if (isSnapshotOperation(base, metadata)) {
return String.format(
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
index 7e49457981..28ef7fe7c2 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
@@ -52,21 +52,10 @@ class UpdateableReference {
this.reference = Preconditions.checkNotNull(ref, "ref is null");
}
- public boolean isBranch() {
- return reference instanceof Branch;
- }
-
public String getHash() {
return reference.getHash();
}
- public Branch getAsBranch() {
- if (!isBranch()) {
- throw new IllegalArgumentException("Reference is not a branch");
- }
- return (Branch) reference;
- }
-
public Reference getReference() {
return reference;
}
@@ -79,4 +68,8 @@ class UpdateableReference {
public String getName() {
return reference.getName();
}
+
+ public boolean isMutable() {
+ return mutable;
+ }
}
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
new file mode 100644
index 0000000000..8da96c224d
--- /dev/null
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.nessie;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Collections;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.projectnessie.client.ext.NessieClientFactory;
+import org.projectnessie.client.ext.NessieClientUri;
+
+public class TestMultipleClients extends BaseTestIceberg {
+
+ private static final String BRANCH = "multiple-clients-test";
+ private static final Schema schema =
+ new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
+
+ public TestMultipleClients() {
+ super(BRANCH);
+ }
+
+ // another client that connects to the same nessie server.
+ NessieCatalog anotherCatalog;
+
+ @Override
+ @BeforeEach
+ public void beforeEach(NessieClientFactory clientFactory, @NessieClientUri URI nessieUri)
+ throws IOException {
+ super.beforeEach(clientFactory, nessieUri);
+ anotherCatalog = initCatalog(branch);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ anotherCatalog.close();
+ }
+
+ @Test
+ public void testListNamespaces() {
+ catalog.createNamespace(Namespace.of("db1"), Collections.emptyMap());
+ Assertions.assertThat(catalog.listNamespaces()).containsExactlyInAnyOrder(Namespace.of("db1"));
+
+ // another client creates a namespace with the same nessie server
+ anotherCatalog.createNamespace(Namespace.of("db2"), Collections.emptyMap());
+ Assertions.assertThat(anotherCatalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+ Assertions.assertThat(catalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+ }
+
+ @Test
+ public void testLoadNamespaceMetadata() {
+ catalog.createNamespace(Namespace.of("namespace1"), Collections.emptyMap());
+ Assertions.assertThat(catalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("namespace1"));
+
+ // another client adds a metadata to the same namespace
+ anotherCatalog.setProperties(Namespace.of("namespace1"), Collections.singletonMap("k1", "v1"));
+ AbstractMap.SimpleEntry<String, String> entry = new AbstractMap.SimpleEntry<>("k1", "v1");
+ Assertions.assertThat(anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .containsExactly(entry);
+
+ Assertions.assertThat(catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .containsExactly(entry);
+ }
+
+ @Test
+ public void testListTables() {
+ catalog.createTable(TableIdentifier.parse("foo.tbl1"), schema);
+ Assertions.assertThat(catalog.listTables(Namespace.of("foo")))
+ .containsExactlyInAnyOrder(TableIdentifier.parse("foo.tbl1"));
+
+ // another client creates a table with the same nessie server
+ anotherCatalog.createTable(TableIdentifier.parse("foo.tbl2"), schema);
+ Assertions.assertThat(anotherCatalog.listTables(Namespace.of("foo")))
+ .containsExactlyInAnyOrder(
+ TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2"));
+
+ Assertions.assertThat(catalog.listTables(Namespace.of("foo")))
+ .containsExactlyInAnyOrder(
+ TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2"));
+ }
+
+ @Test
+ public void testCommits() {
+ TableIdentifier identifier = TableIdentifier.parse("foo.tbl1");
+ catalog.createTable(identifier, schema);
+ Table tableFromCatalog = catalog.loadTable(identifier);
+ tableFromCatalog.updateSchema().addColumn("x1", Types.LongType.get()).commit();
+
+ Table tableFromAnotherCatalog = anotherCatalog.loadTable(identifier);
+ tableFromAnotherCatalog.updateSchema().addColumn("x2", Types.LongType.get()).commit();
+
+ tableFromCatalog.updateSchema().addColumn("x3", Types.LongType.get()).commit();
+ tableFromAnotherCatalog.updateSchema().addColumn("x4", Types.LongType.get()).commit();
+
+ Assertions.assertThat(catalog.loadTable(identifier).schema().columns()).hasSize(5);
+ Assertions.assertThat(anotherCatalog.loadTable(identifier).schema().columns()).hasSize(5);
+ }
+
+ @Test
+ public void testConcurrentCommitsWithRefresh() {
+ TableIdentifier identifier = TableIdentifier.parse("foo.tbl1");
+ catalog.createTable(identifier, schema);
+
+ String hashBefore = catalog.currentHash();
+
+ TableOperations ops1 = catalog.newTableOps(identifier);
+ TableMetadata metadata1 =
+ TableMetadata.buildFrom(ops1.current()).setProperties(ImmutableMap.of("k1", "v1")).build();
+
+ // commit should succeed
+ TableOperations ops2 = catalog.newTableOps(identifier);
+ TableMetadata metadata2 =
+ TableMetadata.buildFrom(ops2.current()).setProperties(ImmutableMap.of("k2", "v2")).build();
+ ops2.commit(ops2.current(), metadata2);
+
+ // refresh the catalog's client.
+ String hashAfter = catalog.currentHash();
+ Assertions.assertThat(hashBefore).isNotEqualTo(hashAfter);
+
+ // client refresh should not affect the ongoing commits (commit should still fail due staleness)
+ Assertions.assertThatThrownBy(() -> ops1.commit(ops1.current(), metadata1))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining(
+ "Cannot commit: Reference hash is out of date. Update the reference 'multiple-clients-test' and try again");
+ }
+}