You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/04/11 11:04:49 UTC

[iceberg] branch master updated: Nessie: Use latest hash for catalog APIs (#6789)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new af4ddd2547 Nessie: Use latest hash for catalog APIs (#6789)
af4ddd2547 is described below

commit af4ddd25475ff049c451bec3c567ca89353da87d
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Tue Apr 11 16:34:39 2023 +0530

    Nessie: Use latest hash for catalog APIs (#6789)
    
    * Nessie: Handle refresh for catalog APIs that doesn't use table operations
    
    * Add commit testcase
    
    * Another test case
    
    * Address comments
    
    * Avoid hash roundtrip
    
    * Address new comments
    
    * refactor
---
 .../apache/iceberg/nessie/NessieIcebergClient.java |  75 +++++-----
 .../apache/iceberg/nessie/UpdateableReference.java |  15 +-
 .../apache/iceberg/nessie/TestMultipleClients.java | 160 +++++++++++++++++++++
 3 files changed, 207 insertions(+), 43 deletions(-)

diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index 5256b37d0c..8b17194acb 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.util.Tasks;
 import org.projectnessie.client.NessieConfigConstants;
 import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
 import org.projectnessie.client.api.NessieApiV1;
+import org.projectnessie.client.api.OnReferenceBuilder;
 import org.projectnessie.client.http.HttpClientException;
 import org.projectnessie.error.BaseNessieClientServerException;
 import org.projectnessie.error.NessieConflictException;
@@ -132,7 +133,7 @@ public class NessieIcebergClient implements AutoCloseable {
 
   public List<TableIdentifier> listTables(Namespace namespace) {
     try {
-      return api.getEntries().reference(getRef().getReference()).get().getEntries().stream()
+      return withReference(api.getEntries()).get().getEntries().stream()
           .filter(namespacePredicate(namespace))
           .filter(e -> Content.Type.ICEBERG_TABLE == e.getType())
           .map(this::toIdentifier)
@@ -168,7 +169,7 @@ public class NessieIcebergClient implements AutoCloseable {
   public IcebergTable table(TableIdentifier tableIdentifier) {
     try {
       ContentKey key = NessieUtil.toKey(tableIdentifier);
-      Content table = api.getContent().key(key).reference(getRef().getReference()).get().get(key);
+      Content table = withReference(api.getContent().key(key)).get().get(key);
       return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
     } catch (NessieNotFoundException e) {
       return null;
@@ -178,11 +179,11 @@ public class NessieIcebergClient implements AutoCloseable {
   public void createNamespace(Namespace namespace, Map<String, String> metadata) {
     try {
       getRef().checkMutable();
-      getApi()
-          .createNamespace()
-          .reference(getRef().getReference())
-          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-          .properties(metadata)
+      withReference(
+              getApi()
+                  .createNamespace()
+                  .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+                  .properties(metadata))
           .create();
       refresh();
     } catch (NessieNamespaceAlreadyExistsException e) {
@@ -199,10 +200,10 @@ public class NessieIcebergClient implements AutoCloseable {
   public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
     try {
       GetNamespacesResponse response =
-          getApi()
-              .getMultipleNamespaces()
-              .reference(getRef().getReference())
-              .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+          withReference(
+                  getApi()
+                      .getMultipleNamespaces()
+                      .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
               .get();
       return response.getNamespaces().stream()
           .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
@@ -220,10 +221,10 @@ public class NessieIcebergClient implements AutoCloseable {
   public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
     try {
       getRef().checkMutable();
-      getApi()
-          .deleteNamespace()
-          .reference(getRef().getReference())
-          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+      withReference(
+              getApi()
+                  .deleteNamespace()
+                  .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
           .delete();
       refresh();
       return true;
@@ -245,10 +246,10 @@ public class NessieIcebergClient implements AutoCloseable {
   public Map<String, String> loadNamespaceMetadata(Namespace namespace)
       throws NoSuchNamespaceException {
     try {
-      return getApi()
-          .getNamespace()
-          .reference(getRef().getReference())
-          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+      return withReference(
+              getApi()
+                  .getNamespace()
+                  .namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
           .get()
           .getProperties();
     } catch (NessieNamespaceNotFoundException e) {
@@ -264,11 +265,11 @@ public class NessieIcebergClient implements AutoCloseable {
 
   public boolean setProperties(Namespace namespace, Map<String, String> properties) {
     try {
-      getApi()
-          .updateProperties()
-          .reference(getRef().getReference())
-          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-          .updateProperties(properties)
+      withReference(
+              getApi()
+                  .updateProperties()
+                  .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+                  .updateProperties(properties))
           .update();
       refresh();
       // always successful, otherwise an exception is thrown
@@ -286,11 +287,11 @@ public class NessieIcebergClient implements AutoCloseable {
 
   public boolean removeProperties(Namespace namespace, Set<String> properties) {
     try {
-      getApi()
-          .updateProperties()
-          .reference(getRef().getReference())
-          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-          .removeProperties(properties)
+      withReference(
+              getApi()
+                  .updateProperties()
+                  .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+                  .removeProperties(properties))
           .update();
       refresh();
       // always successful, otherwise an exception is thrown
@@ -336,7 +337,7 @@ public class NessieIcebergClient implements AutoCloseable {
           .onFailure((o, exception) -> refresh())
           .run(
               ops -> {
-                Branch branch = ops.branch(getRef().getAsBranch()).commit();
+                Branch branch = ops.branch((Branch) getRef().getReference()).commit();
                 getRef().updateReference(branch);
               },
               BaseNessieClientServerException.class);
@@ -403,7 +404,7 @@ public class NessieIcebergClient implements AutoCloseable {
           .onFailure((o, exception) -> refresh())
           .run(
               commitBuilder -> {
-                Branch branch = commitBuilder.branch(getRef().getAsBranch()).commit();
+                Branch branch = commitBuilder.branch((Branch) getRef().getReference()).commit();
                 getRef().updateReference(branch);
               },
               BaseNessieClientServerException.class);
@@ -432,7 +433,7 @@ public class NessieIcebergClient implements AutoCloseable {
 
     updateableReference.checkMutable();
 
-    Branch current = updateableReference.getAsBranch();
+    Branch current = (Branch) updateableReference.getReference();
     Branch expectedHead = current;
     if (base != null) {
       String metadataCommitId =
@@ -492,6 +493,16 @@ public class NessieIcebergClient implements AutoCloseable {
             || snapshot.snapshotId() != base.currentSnapshot().snapshotId());
   }
 
+  private <T extends OnReferenceBuilder<?>> T withReference(T builder) {
+    UpdateableReference ref = getRef();
+    if (!ref.isMutable()) {
+      builder.reference(ref.getReference());
+    } else {
+      builder.refName(ref.getName());
+    }
+    return builder;
+  }
+
   private String buildCommitMsg(TableMetadata base, TableMetadata metadata, String tableName) {
     if (isSnapshotOperation(base, metadata)) {
       return String.format(
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
index 7e49457981..28ef7fe7c2 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java
@@ -52,21 +52,10 @@ class UpdateableReference {
     this.reference = Preconditions.checkNotNull(ref, "ref is null");
   }
 
-  public boolean isBranch() {
-    return reference instanceof Branch;
-  }
-
   public String getHash() {
     return reference.getHash();
   }
 
-  public Branch getAsBranch() {
-    if (!isBranch()) {
-      throw new IllegalArgumentException("Reference is not a branch");
-    }
-    return (Branch) reference;
-  }
-
   public Reference getReference() {
     return reference;
   }
@@ -79,4 +68,8 @@ class UpdateableReference {
   public String getName() {
     return reference.getName();
   }
+
+  public boolean isMutable() {
+    return mutable;
+  }
 }
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
new file mode 100644
index 0000000000..8da96c224d
--- /dev/null
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.nessie;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Collections;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.projectnessie.client.ext.NessieClientFactory;
+import org.projectnessie.client.ext.NessieClientUri;
+
+public class TestMultipleClients extends BaseTestIceberg {
+
+  private static final String BRANCH = "multiple-clients-test";
+  private static final Schema schema =
+      new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
+
+  public TestMultipleClients() {
+    super(BRANCH);
+  }
+
+  // another client that connects to the same nessie server.
+  NessieCatalog anotherCatalog;
+
+  @Override
+  @BeforeEach
+  public void beforeEach(NessieClientFactory clientFactory, @NessieClientUri URI nessieUri)
+      throws IOException {
+    super.beforeEach(clientFactory, nessieUri);
+    anotherCatalog = initCatalog(branch);
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    anotherCatalog.close();
+  }
+
+  @Test
+  public void testListNamespaces() {
+    catalog.createNamespace(Namespace.of("db1"), Collections.emptyMap());
+    Assertions.assertThat(catalog.listNamespaces()).containsExactlyInAnyOrder(Namespace.of("db1"));
+
+    // another client creates a namespace with the same nessie server
+    anotherCatalog.createNamespace(Namespace.of("db2"), Collections.emptyMap());
+    Assertions.assertThat(anotherCatalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+    Assertions.assertThat(catalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+  }
+
+  @Test
+  public void testLoadNamespaceMetadata() {
+    catalog.createNamespace(Namespace.of("namespace1"), Collections.emptyMap());
+    Assertions.assertThat(catalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("namespace1"));
+
+    // another client adds a metadata to the same namespace
+    anotherCatalog.setProperties(Namespace.of("namespace1"), Collections.singletonMap("k1", "v1"));
+    AbstractMap.SimpleEntry<String, String> entry = new AbstractMap.SimpleEntry<>("k1", "v1");
+    Assertions.assertThat(anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .containsExactly(entry);
+
+    Assertions.assertThat(catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .containsExactly(entry);
+  }
+
+  @Test
+  public void testListTables() {
+    catalog.createTable(TableIdentifier.parse("foo.tbl1"), schema);
+    Assertions.assertThat(catalog.listTables(Namespace.of("foo")))
+        .containsExactlyInAnyOrder(TableIdentifier.parse("foo.tbl1"));
+
+    // another client creates a table with the same nessie server
+    anotherCatalog.createTable(TableIdentifier.parse("foo.tbl2"), schema);
+    Assertions.assertThat(anotherCatalog.listTables(Namespace.of("foo")))
+        .containsExactlyInAnyOrder(
+            TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2"));
+
+    Assertions.assertThat(catalog.listTables(Namespace.of("foo")))
+        .containsExactlyInAnyOrder(
+            TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2"));
+  }
+
+  @Test
+  public void testCommits() {
+    TableIdentifier identifier = TableIdentifier.parse("foo.tbl1");
+    catalog.createTable(identifier, schema);
+    Table tableFromCatalog = catalog.loadTable(identifier);
+    tableFromCatalog.updateSchema().addColumn("x1", Types.LongType.get()).commit();
+
+    Table tableFromAnotherCatalog = anotherCatalog.loadTable(identifier);
+    tableFromAnotherCatalog.updateSchema().addColumn("x2", Types.LongType.get()).commit();
+
+    tableFromCatalog.updateSchema().addColumn("x3", Types.LongType.get()).commit();
+    tableFromAnotherCatalog.updateSchema().addColumn("x4", Types.LongType.get()).commit();
+
+    Assertions.assertThat(catalog.loadTable(identifier).schema().columns()).hasSize(5);
+    Assertions.assertThat(anotherCatalog.loadTable(identifier).schema().columns()).hasSize(5);
+  }
+
+  @Test
+  public void testConcurrentCommitsWithRefresh() {
+    TableIdentifier identifier = TableIdentifier.parse("foo.tbl1");
+    catalog.createTable(identifier, schema);
+
+    String hashBefore = catalog.currentHash();
+
+    TableOperations ops1 = catalog.newTableOps(identifier);
+    TableMetadata metadata1 =
+        TableMetadata.buildFrom(ops1.current()).setProperties(ImmutableMap.of("k1", "v1")).build();
+
+    // commit should succeed
+    TableOperations ops2 = catalog.newTableOps(identifier);
+    TableMetadata metadata2 =
+        TableMetadata.buildFrom(ops2.current()).setProperties(ImmutableMap.of("k2", "v2")).build();
+    ops2.commit(ops2.current(), metadata2);
+
+    // refresh the catalog's client.
+    String hashAfter = catalog.currentHash();
+    Assertions.assertThat(hashBefore).isNotEqualTo(hashAfter);
+
+    // client refresh should not affect the ongoing commits (commit should still fail due staleness)
+    Assertions.assertThatThrownBy(() -> ops1.commit(ops1.current(), metadata1))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining(
+            "Cannot commit: Reference hash is out of date. Update the reference 'multiple-clients-test' and try again");
+  }
+}