You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/26 18:09:22 UTC

[GitHub] [iceberg] rymurr commented on a change in pull request #1587: Nessie support for core and Spark 2/3

rymurr commented on a change in pull request #1587:
URL: https://github.com/apache/iceberg/pull/1587#discussion_r511959931



##########
File path: build.gradle
##########
@@ -891,6 +924,49 @@ project(':iceberg-pig') {
   }
 }
 
+project(':iceberg-nessie') {
+  apply plugin: 'org.projectnessie'

Review comment:
       It uses `quarkusAppRunnerConfig` dependencies to discover the Nessie Quarkus server and its dependencies then uses that to start a server. Some of the operations to discover all runtime dependencies are non-trivial and require a full gradle dependency graph, hence why its non-trivial to do in a test suite. I believe the primary reason for all this is to facilitate easily building graalvm native images.
   
   See https://github.com/projectnessie/nessie/tree/main/tools/apprunner-gradle-plugin for the actual code

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {
+
+  public static final String CONF_NESSIE_URL = "nessie.url";
+  public static final String CONF_NESSIE_USERNAME = "nessie.username";
+  public static final String CONF_NESSIE_PASSWORD = "nessie.password";
+  public static final String CONF_NESSIE_AUTH_TYPE = "nessie.auth_type";
+  public static final String NESSIE_AUTH_TYPE_DEFAULT = "BASIC";
+  public static final String CONF_NESSIE_REF = "nessie.ref";
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String url) {
+    this.config = config;
+    this.name = name;
+    String path = url == null ? config.get(CONF_NESSIE_URL) : url;
+    String username = config.get(CONF_NESSIE_USERNAME);
+    String password = config.get(CONF_NESSIE_PASSWORD);
+    String authTypeStr = config.get(CONF_NESSIE_AUTH_TYPE, NESSIE_AUTH_TYPE_DEFAULT);
+    AuthType authType = AuthType.valueOf(authTypeStr);
+    this.client = new NessieClient(authType, path, username, password);
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : config.get(CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");

Review comment:
       I have cleaned this up a bit and tried to follow the pattern you suggested in #1640

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path."));

Review comment:
       fixed

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch." +
+        "Either configure an alternative ref via %s or create the default branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = client.getContentsApi().getContents(toKey(tableIdentifier), reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());
+    } catch (NessieNotFoundException ex) {
+      throw new RuntimeException("Unable to list tables due to missing ref.", ex);

Review comment:
       :+1: 

##########
File path: build.gradle
##########
@@ -693,6 +709,8 @@ if (jdkVersion == '8') {
       // Vectorized reads need more memory
       maxHeapSize '2500m'
     }
+    // start and stop quarkus for nessie tests
+    tasks.test.dependsOn("quarkus-start").finalizedBy("quarkus-stop")

Review comment:
       Quarkus (which is the underlying http framework)  behaviour is slightly counterintuitive in that it doesn't offer an option to start Nessie like you can start the hive metastore. Hence we start it once per module and test suites are responsible for cleanup

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {

Review comment:
       I will take another pass at this today, I can see totally valid reasons to support listing namespaces if they have tables in them. The problem as I see it comes from creating or deleting namespaces, and storing namespace metadata.
   
   * create/delete: in Nessie (similar to git) a namespace would be created implicitly with the first table in that namespace tree and deleted with the last table in that namespace tree. Separate crerate/delete options in nessie are either no-ops or require a dummy to be placed in that namespace. Both of which are odd operations. eg if its a no-op then creating namespace `foo.bar` then asking if `foo.bar` exists will return `false`.
   
   * namespace metadata: What is the use case envisioned for those operations? I think for Nessie we would start with the same behaviour as the hdfs catalog but am curious to know the benefit of supporting those apis.

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {

Review comment:
       Having another look we could add valid impls for `namespaceExists` and `listNamespaces` and do no-op or throw for the others. Then the clients can still navigate namespaces. Thoughts?

##########
File path: spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
##########
@@ -56,6 +56,8 @@ public void cleanNamespaces() {
 
   @Test
   public void testCreateNamespace() {
+    // Nessie namespaces are explicit and do not need to be explicitly managed
+    Assume.assumeFalse(catalogName.endsWith("testnessie"));

Review comment:
       Sure, the hadoop catalog is also skipped for most of these. Makes sense to have separate tests

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch." +
+        "Either configure an alternative ref via %s or create the default branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = client.getContentsApi().getContents(toKey(tableIdentifier), reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());

Review comment:
       Just checked and the contract is `Return all the identifiers under this namespace.` I took this to mean everything under this and all sub namespaces. If that was not the intention of the method I will fix the predicate.

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch." +
+        "Either configure an alternative ref via %s or create the default branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = client.getContentsApi().getContents(toKey(tableIdentifier), reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());

Review comment:
       You are correct, it will return everythiing in and below `namespace`. What is the contract supposed to be? Only tables in this namespace? 

##########
File path: spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
##########
@@ -50,6 +50,7 @@ public TestCreateTableAsSelect(String catalogName, String implementation, Map<St
   @After
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", sourceName);

Review comment:
       The way I was running in the test made it get deleted on the backend nessie server but not in the cached spark context I will clean this up as part of the Spark rework

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+
+    try {
+      IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", applicationId()),
+                                          newTable);
+    } catch (NessieNotFoundException | NessieConflictException ex) {
+      io().deleteFile(newMetadataLocation);
+      throw new CommitFailedException(ex, "failed");
+    } catch (Throwable e) {
+      io().deleteFile(newMetadataLocation);
+      throw new RuntimeException("Unexpected commit exception", e);
+    }
+  }
+
+  @Override
+  public FileIO io() {
+    if (fileIO == null) {
+      fileIO = new HadoopFileIO(conf);
+    }
+
+    return fileIO;
+  }
+
+  /**
+   * try and get a Spark application id if one exists.
+   *
+   * <p>
+   *   We haven't figured out a general way to pass commit messages through to the Nessie committer yet.
+   *   This is hacky but gets the job done until we can have a more complete commit/audit log.
+   * </p>
+   */
+  private static String applicationId() {
+    try {
+      if (sparkConfMethod == null) {
+        Class sparkEnvClazz = Class.forName("org.apache.spark.SparkEnv");
+        sparkEnvMethod = sparkEnvClazz.getMethod("get");
+        Class sparkConfClazz = Class.forName("org.apache.spark.SparkConf");
+        sparkConfMethod = sparkEnvClazz.getMethod("conf");
+        appIdMethod = sparkConfClazz.getMethod("getAppId");

Review comment:
       :+1: 
   

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+
+    try {
+      IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", applicationId()),

Review comment:
       good eye, the first char of the `applicationId` is a newline. I've put no space between `commit` and `%s` to not have extra trailing whitespace in message.
   
   Also note that the handling of commit messages in nessie is still fairly primitive. This should get replaced by a structured object in the near future.

##########
File path: nessie/src/test/java/org/apache/iceberg/nessie/TestParsedTableIdentifier.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestParsedTableIdentifier {
+
+
+  @Test
+  public void noMarkings() {
+    String path = "foo";
+    ParsedTableIdentifier pti = ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+    Assert.assertEquals(path, pti.getTableIdentifier().name());
+    Assert.assertNull(pti.getReference());
+    Assert.assertNull(pti.getTimestamp());
+  }
+
+  @Test
+  public void branchOnly() {
+    String path = "foo@bar";
+    ParsedTableIdentifier pti = ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+    Assert.assertEquals("foo", pti.getTableIdentifier().name());
+    Assert.assertEquals("bar", pti.getReference());
+    Assert.assertNull(pti.getTimestamp());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void timestampOnly() {
+    String path = "foo#baz";
+    ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void branchAndTimestamp() {
+    String path = "foo@bar#baz";
+    ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+  }
+
+  @Test(expected = IllegalArgumentException.class)

Review comment:
       fixed

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+
+    try {
+      IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", applicationId()),
+                                          newTable);
+    } catch (NessieNotFoundException | NessieConflictException ex) {

Review comment:
       good eye, cleaned up exception message and handled throwing better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org