You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/09 15:54:14 UTC
[iceberg] branch master updated: Nessie: Use ref.hash parameter to read data at given hash (#4700)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c886ba5b8 Nessie: Use ref.hash parameter to read data at given hash (#4700)
c886ba5b8 is described below
commit c886ba5b8ba931706550e72f13c23bd88e537521
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon May 9 17:54:07 2022 +0200
Nessie: Use ref.hash parameter to read data at given hash (#4700)
This makes sure that the `NessieCatalog` reads data at the given
`ref.hash` (`NessieConfigConstants.CONF_NESSIE_REF_HASH`) when it's provided. If `ref.hash` is `null`, then this means
that data should be read from whatever the latest `HEAD` is.
---
.../org/apache/iceberg/nessie/NessieCatalog.java | 14 ++--
.../apache/iceberg/nessie/NessieIcebergClient.java | 11 ++-
.../org/apache/iceberg/nessie/BaseTestIceberg.java | 18 +++--
.../iceberg/nessie/TestBranchVisibility.java | 45 +++++++++++
.../iceberg/nessie/TestNessieIcebergClient.java | 87 ++++++++++++++++++++++
versions.props | 2 +-
6 files changed, 160 insertions(+), 17 deletions(-)
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
index 482467c78..4f208a286 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
@@ -81,12 +81,13 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
// remove nessie prefix
final Function<String, String> removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, "");
final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
+ String requestedHash = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF_HASH));
NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
.fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);
initialize(name,
- new NessieIcebergClient(api, requestedRef, null, catalogOptions),
+ new NessieIcebergClient(api, requestedRef, requestedHash, catalogOptions),
fileIOImpl == null ? new HadoopFileIO(config) : CatalogUtil.loadFileIO(fileIOImpl, options, config),
catalogOptions);
}
@@ -223,16 +224,12 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
@Override
public boolean setProperties(Namespace namespace, Map<String, String> properties) {
- client.setProperties(namespace, properties);
- // always successful, otherwise an exception is thrown
- return true;
+ return client.setProperties(namespace, properties);
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties) {
- client.removeProperties(namespace, properties);
- // always successful, otherwise an exception is thrown
- return true;
+ return client.removeProperties(namespace, properties);
}
@Override
@@ -245,7 +242,8 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable
return config;
}
- public String currentHash() {
+ @VisibleForTesting
+ String currentHash() {
return client.getRef().getHash();
}
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index b649512c9..39adeba67 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -87,7 +87,8 @@ public class NessieIcebergClient implements AutoCloseable {
}
public NessieIcebergClient withReference(String requestedRef, String hash) {
- if (null == requestedRef) {
+ if (null == requestedRef ||
+ (getRef().getReference().getName().equals(requestedRef) && getRef().getHash().equals(hash))) {
return this;
}
return new NessieIcebergClient(getApi(), requestedRef, hash, catalogOptions);
@@ -231,7 +232,7 @@ public class NessieIcebergClient implements AutoCloseable {
}
}
- public void setProperties(Namespace namespace, Map<String, String> properties) {
+ public boolean setProperties(Namespace namespace, Map<String, String> properties) {
try {
getApi()
.updateProperties()
@@ -240,6 +241,8 @@ public class NessieIcebergClient implements AutoCloseable {
.updateProperties(properties)
.update();
refresh();
+ // always successful, otherwise an exception is thrown
+ return true;
} catch (NessieNamespaceNotFoundException e) {
throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
} catch (NessieNotFoundException e) {
@@ -249,7 +252,7 @@ public class NessieIcebergClient implements AutoCloseable {
}
}
- public void removeProperties(Namespace namespace, Set<String> properties) {
+ public boolean removeProperties(Namespace namespace, Set<String> properties) {
try {
getApi()
.updateProperties()
@@ -258,6 +261,8 @@ public class NessieIcebergClient implements AutoCloseable {
.removeProperties(properties)
.update();
refresh();
+ // always successful, otherwise an exception is thrown
+ return true;
} catch (NessieNamespaceNotFoundException e) {
throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
} catch (NessieNotFoundException e) {
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
index 756694401..9c6c98058 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
@@ -122,13 +122,21 @@ public abstract class BaseTestIceberg {
}
NessieCatalog initCatalog(String ref) {
+ return initCatalog(ref, null);
+ }
+
+ NessieCatalog initCatalog(String ref, String hash) {
NessieCatalog newCatalog = new NessieCatalog();
newCatalog.setConf(hadoopConfig);
- newCatalog.initialize("nessie", ImmutableMap.of("ref", ref,
- CatalogProperties.URI, uri,
- "auth-type", "NONE",
- CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString()
- ));
+ ImmutableMap.Builder<String, String> options = ImmutableMap.<String, String>builder()
+ .put("ref", ref)
+ .put(CatalogProperties.URI, uri)
+ .put("auth-type", "NONE")
+ .put(CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString());
+ if (null != hash) {
+ options.put("ref.hash", hash);
+ }
+ newCatalog.initialize("nessie", options.build());
return newCatalog;
}
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
index faf1c5957..3ca769e73 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java
@@ -24,11 +24,13 @@ import java.util.Map;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
@@ -48,6 +50,8 @@ import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Reference;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
public class TestBranchVisibility extends BaseTestIceberg {
private final TableIdentifier tableIdentifier1 = TableIdentifier.of("test-ns", "table1");
@@ -398,4 +402,45 @@ public class TestBranchVisibility extends BaseTestIceberg {
assertion.isNotEqualTo(testTable2);
}
}
+
+ @Test
+ public void testWithRefAndHash() throws NessieConflictException, NessieNotFoundException {
+ String testBranch = "testBranch";
+ createBranch(testBranch, null);
+ Schema schema = new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
+
+ NessieCatalog nessieCatalog = initCatalog(testBranch);
+ String hashBeforeNamespaceCreation = api.getReference().refName(testBranch).get().getHash();
+ Namespace namespace = Namespace.of("a", "b");
+ Assertions.assertThat(nessieCatalog.listNamespaces(namespace)).isEmpty();
+
+ nessieCatalog.createNamespace(namespace);
+ Assertions.assertThat(nessieCatalog.listNamespaces(namespace)).isNotEmpty();
+ Assertions.assertThat(nessieCatalog.listTables(namespace)).isEmpty();
+
+ NessieCatalog catalogAtHash1 = initCatalog(testBranch, hashBeforeNamespaceCreation);
+ Assertions.assertThat(catalogAtHash1.listNamespaces(namespace)).isEmpty();
+ Assertions.assertThat(catalogAtHash1.listTables(namespace)).isEmpty();
+
+ TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+ String hashBeforeTableCreation = nessieCatalog.currentHash();
+ nessieCatalog.createTable(identifier, schema);
+ Assertions.assertThat(nessieCatalog.listTables(namespace)).hasSize(1);
+
+ NessieCatalog catalogAtHash2 = initCatalog(testBranch, hashBeforeTableCreation);
+ Assertions.assertThat(catalogAtHash2.listNamespaces(namespace)).isNotEmpty();
+ Assertions.assertThat(catalogAtHash2.listTables(namespace)).isEmpty();
+
+ // updates should not be possible
+ Assertions.assertThatThrownBy(() -> catalogAtHash2.createTable(identifier, schema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("You can only mutate tables when using a branch without a hash or timestamp.");
+ Assertions.assertThat(catalogAtHash2.listTables(namespace)).isEmpty();
+
+ // updates should be still possible here
+ nessieCatalog = initCatalog(testBranch);
+ TableIdentifier identifier2 = TableIdentifier.of(namespace, "table2");
+ nessieCatalog.createTable(identifier2, schema);
+ Assertions.assertThat(nessieCatalog.listTables(namespace)).hasSize(2);
+ }
}
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
new file mode 100644
index 000000000..1a469b9b7
--- /dev/null
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.model.Branch;
+import org.projectnessie.model.Reference;
+
+public class TestNessieIcebergClient extends BaseTestIceberg {
+
+ private static final String BRANCH = "test-nessie-client";
+
+ public TestNessieIcebergClient() {
+ super(BRANCH);
+ }
+
+ @Test
+ public void testWithNullRefLoadsMain() throws NessieNotFoundException {
+ NessieIcebergClient client = new NessieIcebergClient(api, null, null, ImmutableMap.of());
+ Assertions.assertThat(client.getRef().getReference())
+ .isEqualTo(api.getReference().refName("main").get());
+ }
+
+ @Test
+ public void testWithNullHash() throws NessieNotFoundException {
+ NessieIcebergClient client = new NessieIcebergClient(api, BRANCH, null, ImmutableMap.of());
+ Assertions.assertThat(client.getRef().getReference())
+ .isEqualTo(api.getReference().refName(BRANCH).get());
+ }
+
+ @Test
+ public void testWithReference() throws NessieNotFoundException {
+ NessieIcebergClient client = new NessieIcebergClient(api, "main", null, ImmutableMap.of());
+
+ Assertions.assertThat(client.withReference(null, null)).isEqualTo(client);
+ Assertions.assertThat(client.withReference("main", null)).isNotEqualTo(client);
+ Assertions.assertThat(
+ client.withReference("main", api.getReference().refName("main").get().getHash()))
+ .isEqualTo(client);
+
+ Assertions.assertThat(client.withReference(BRANCH, null)).isNotEqualTo(client);
+ Assertions.assertThat(
+ client.withReference(BRANCH, api.getReference().refName(BRANCH).get().getHash()))
+ .isNotEqualTo(client);
+ }
+
+ @Test
+ public void testWithReferenceAfterRecreatingBranch() throws NessieConflictException, NessieNotFoundException {
+ String branch = "branchToBeDropped";
+ createBranch(branch, null);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null, ImmutableMap.of());
+
+ // just create a new commit on the branch and then delete & re-create it
+ Namespace namespace = Namespace.of("a");
+ client.createNamespace(namespace, ImmutableMap.of());
+ Assertions.assertThat(client.listNamespaces(namespace)).isNotNull();
+ client.getApi().deleteBranch().branch((Branch) client.getApi().getReference().refName(branch).get()).delete();
+ createBranch(branch, null);
+
+ // make sure the client uses the re-created branch
+ Reference ref = client.getApi().getReference().refName(branch).get();
+ Assertions.assertThat(client.withReference(branch, null).getRef().getReference()).isEqualTo(ref);
+ Assertions.assertThat(client.withReference(branch, null)).isNotEqualTo(client);
+ }
+}
diff --git a/versions.props b/versions.props
index 3a8203cf2..414d19c7a 100644
--- a/versions.props
+++ b/versions.props
@@ -24,7 +24,7 @@ javax.xml.bind:jaxb-api = 2.3.1
javax.activation:activation = 1.1.1
org.glassfish.jaxb:jaxb-runtime = 2.3.3
software.amazon.awssdk:* = 2.17.131
-org.projectnessie:* = 0.28.0
+org.projectnessie:* = 0.29.0
com.google.cloud:libraries-bom = 24.1.0
org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0
org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0