You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/09 15:54:14 UTC

[iceberg] branch master updated: Nessie: Use ref.hash parameter to read data at given hash (#4700)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c886ba5b8 Nessie: Use ref.hash parameter to read data at given hash (#4700)
c886ba5b8 is described below

commit c886ba5b8ba931706550e72f13c23bd88e537521
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon May 9 17:54:07 2022 +0200

    Nessie: Use ref.hash parameter to read data at given hash (#4700)
    
    This makes sure that the `NessieCatalog` reads data at the given
    `ref.hash` (`NessieConfigConstants.CONF_NESSIE_REF_HASH`) when it's provided. If `ref.hash` is `null`, then this means
    that data should be read from whatever the latest `HEAD` is.
---
 .../org/apache/iceberg/nessie/NessieCatalog.java   | 14 ++--
 .../apache/iceberg/nessie/NessieIcebergClient.java | 11 ++-
 .../org/apache/iceberg/nessie/BaseTestIceberg.java | 18 +++--
 .../iceberg/nessie/TestBranchVisibility.java       | 45 +++++++++++
 .../iceberg/nessie/TestNessieIcebergClient.java    | 87 ++++++++++++++++++++++
 versions.props                                     |  2 +-
 6 files changed, 160 insertions(+), 17 deletions(-)

diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
index 482467c78..4f208a286 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
@@ -81,12 +81,13 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
     // remove nessie prefix
     final Function<String, String> removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, "");
     final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
+    String requestedHash = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF_HASH));
     NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
         .fromConfig(x -> options.get(removePrefix.apply(x)))
         .build(NessieApiV1.class);
 
     initialize(name,
-        new NessieIcebergClient(api, requestedRef, null, catalogOptions),
+        new NessieIcebergClient(api, requestedRef, requestedHash, catalogOptions),
         fileIOImpl == null ? new HadoopFileIO(config) : CatalogUtil.loadFileIO(fileIOImpl, options, config),
         catalogOptions);
   }
@@ -223,16 +224,12 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
 
   @Override
   public boolean setProperties(Namespace namespace, Map<String, String> properties) {
-    client.setProperties(namespace, properties);
-    // always successful, otherwise an exception is thrown
-    return true;
+    return client.setProperties(namespace, properties);
   }
 
   @Override
   public boolean removeProperties(Namespace namespace, Set<String> properties) {
-    client.removeProperties(namespace, properties);
-    // always successful, otherwise an exception is thrown
-    return true;
+    return client.removeProperties(namespace, properties);
   }
 
   @Override
@@ -245,7 +242,8 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
     return config;
   }
 
-  public String currentHash() {
+  @VisibleForTesting
+  String currentHash() {
     return client.getRef().getHash();
   }
 
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index b649512c9..39adeba67 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -87,7 +87,8 @@ public class NessieIcebergClient implements AutoCloseable {
   }
 
   public NessieIcebergClient withReference(String requestedRef, String hash) {
-    if (null == requestedRef) {
+    if (null == requestedRef ||
+        (getRef().getReference().getName().equals(requestedRef) && getRef().getHash().equals(hash))) {
       return this;
     }
     return new NessieIcebergClient(getApi(), requestedRef, hash, catalogOptions);
@@ -231,7 +232,7 @@ public class NessieIcebergClient implements AutoCloseable {
     }
   }
 
-  public void setProperties(Namespace namespace, Map<String, String> properties) {
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) {
     try {
       getApi()
           .updateProperties()
@@ -240,6 +241,8 @@ public class NessieIcebergClient implements AutoCloseable {
           .updateProperties(properties)
           .update();
       refresh();
+      // always successful, otherwise an exception is thrown
+      return true;
     } catch (NessieNamespaceNotFoundException e) {
       throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
     } catch (NessieNotFoundException e) {
@@ -249,7 +252,7 @@ public class NessieIcebergClient implements AutoCloseable {
     }
   }
 
-  public void removeProperties(Namespace namespace, Set<String> properties) {
+  public boolean removeProperties(Namespace namespace, Set<String> properties) {
     try {
       getApi()
           .updateProperties()
@@ -258,6 +261,8 @@ public class NessieIcebergClient implements AutoCloseable {
           .removeProperties(properties)
           .update();
       refresh();
+      // always successful, otherwise an exception is thrown
+      return true;
     } catch (NessieNamespaceNotFoundException e) {
       throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
     } catch (NessieNotFoundException e) {
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
index 756694401..9c6c98058 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
@@ -122,13 +122,21 @@ public abstract class BaseTestIceberg {
   }
 
   NessieCatalog initCatalog(String ref) {
+    return initCatalog(ref, null);
+  }
+
+  NessieCatalog initCatalog(String ref, String hash) {
     NessieCatalog newCatalog = new NessieCatalog();
     newCatalog.setConf(hadoopConfig);
-    newCatalog.initialize("nessie", ImmutableMap.of("ref", ref,
-        CatalogProperties.URI, uri,
-        "auth-type", "NONE",
-        CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString()
-    ));
+    ImmutableMap.Builder<String, String> options = ImmutableMap.<String, String>builder()
+        .put("ref", ref)
+        .put(CatalogProperties.URI, uri)
+        .put("auth-type", "NONE")
+        .put(CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString());
+    if (null != hash) {
+      options.put("ref.hash", hash);
+    }
+    newCatalog.initialize("nessie", options.build());
     return newCatalog;
   }
 
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
index faf1c5957..3ca769e73 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
@@ -24,11 +24,13 @@ import java.util.Map;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
@@ -48,6 +50,8 @@ import org.projectnessie.model.ContentKey;
 import org.projectnessie.model.IcebergTable;
 import org.projectnessie.model.Reference;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 public class TestBranchVisibility extends BaseTestIceberg {
 
   private final TableIdentifier tableIdentifier1 = TableIdentifier.of("test-ns", "table1");
@@ -398,4 +402,45 @@ public class TestBranchVisibility extends BaseTestIceberg {
       assertion.isNotEqualTo(testTable2);
     }
   }
+
+  @Test
+  public void testWithRefAndHash() throws NessieConflictException, NessieNotFoundException {
+    String testBranch = "testBranch";
+    createBranch(testBranch, null);
+    Schema schema = new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
+
+    NessieCatalog nessieCatalog = initCatalog(testBranch);
+    String hashBeforeNamespaceCreation = api.getReference().refName(testBranch).get().getHash();
+    Namespace namespace = Namespace.of("a", "b");
+    Assertions.assertThat(nessieCatalog.listNamespaces(namespace)).isEmpty();
+
+    nessieCatalog.createNamespace(namespace);
+    Assertions.assertThat(nessieCatalog.listNamespaces(namespace)).isNotEmpty();
+    Assertions.assertThat(nessieCatalog.listTables(namespace)).isEmpty();
+
+    NessieCatalog catalogAtHash1 = initCatalog(testBranch, hashBeforeNamespaceCreation);
+    Assertions.assertThat(catalogAtHash1.listNamespaces(namespace)).isEmpty();
+    Assertions.assertThat(catalogAtHash1.listTables(namespace)).isEmpty();
+
+    TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+    String hashBeforeTableCreation = nessieCatalog.currentHash();
+    nessieCatalog.createTable(identifier, schema);
+    Assertions.assertThat(nessieCatalog.listTables(namespace)).hasSize(1);
+
+    NessieCatalog catalogAtHash2 = initCatalog(testBranch, hashBeforeTableCreation);
+    Assertions.assertThat(catalogAtHash2.listNamespaces(namespace)).isNotEmpty();
+    Assertions.assertThat(catalogAtHash2.listTables(namespace)).isEmpty();
+
+    // updates should not be possible
+    Assertions.assertThatThrownBy(() -> catalogAtHash2.createTable(identifier, schema))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("You can only mutate tables when using a branch without a hash or timestamp.");
+    Assertions.assertThat(catalogAtHash2.listTables(namespace)).isEmpty();
+
+    // updates should be still possible here
+    nessieCatalog = initCatalog(testBranch);
+    TableIdentifier identifier2 = TableIdentifier.of(namespace, "table2");
+    nessieCatalog.createTable(identifier2, schema);
+    Assertions.assertThat(nessieCatalog.listTables(namespace)).hasSize(2);
+  }
 }
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
new file mode 100644
index 000000000..1a469b9b7
--- /dev/null
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.model.Branch;
+import org.projectnessie.model.Reference;
+
+public class TestNessieIcebergClient extends BaseTestIceberg {
+
+  private static final String BRANCH = "test-nessie-client";
+
+  public TestNessieIcebergClient() {
+    super(BRANCH);
+  }
+
+  @Test
+  public void testWithNullRefLoadsMain() throws NessieNotFoundException {
+    NessieIcebergClient client = new NessieIcebergClient(api, null, null, ImmutableMap.of());
+    Assertions.assertThat(client.getRef().getReference())
+        .isEqualTo(api.getReference().refName("main").get());
+  }
+
+  @Test
+  public void testWithNullHash() throws NessieNotFoundException {
+    NessieIcebergClient client = new NessieIcebergClient(api, BRANCH, null, ImmutableMap.of());
+    Assertions.assertThat(client.getRef().getReference())
+        .isEqualTo(api.getReference().refName(BRANCH).get());
+  }
+
+  @Test
+  public void testWithReference() throws NessieNotFoundException {
+    NessieIcebergClient client = new NessieIcebergClient(api, "main", null, ImmutableMap.of());
+
+    Assertions.assertThat(client.withReference(null, null)).isEqualTo(client);
+    Assertions.assertThat(client.withReference("main", null)).isNotEqualTo(client);
+    Assertions.assertThat(
+            client.withReference("main", api.getReference().refName("main").get().getHash()))
+        .isEqualTo(client);
+
+    Assertions.assertThat(client.withReference(BRANCH, null)).isNotEqualTo(client);
+    Assertions.assertThat(
+            client.withReference(BRANCH, api.getReference().refName(BRANCH).get().getHash()))
+        .isNotEqualTo(client);
+  }
+
+  @Test
+  public void testWithReferenceAfterRecreatingBranch() throws NessieConflictException, NessieNotFoundException {
+    String branch = "branchToBeDropped";
+    createBranch(branch, null);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, ImmutableMap.of());
+
+    // just create a new commit on the branch and then delete & re-create it
+    Namespace namespace = Namespace.of("a");
+    client.createNamespace(namespace, ImmutableMap.of());
+    Assertions.assertThat(client.listNamespaces(namespace)).isNotNull();
+    client.getApi().deleteBranch().branch((Branch) client.getApi().getReference().refName(branch).get()).delete();
+    createBranch(branch, null);
+
+    // make sure the client uses the re-created branch
+    Reference ref = client.getApi().getReference().refName(branch).get();
+    Assertions.assertThat(client.withReference(branch, null).getRef().getReference()).isEqualTo(ref);
+    Assertions.assertThat(client.withReference(branch, null)).isNotEqualTo(client);
+  }
+}
diff --git a/versions.props b/versions.props
index 3a8203cf2..414d19c7a 100644
--- a/versions.props
+++ b/versions.props
@@ -24,7 +24,7 @@ javax.xml.bind:jaxb-api = 2.3.1
 javax.activation:activation = 1.1.1
 org.glassfish.jaxb:jaxb-runtime = 2.3.3
 software.amazon.awssdk:* = 2.17.131
-org.projectnessie:* = 0.28.0
+org.projectnessie:* = 0.29.0
 com.google.cloud:libraries-bom = 24.1.0
 org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0
 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0