You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/04 13:36:13 UTC

[GitHub] [iceberg] rymurr opened a new pull request #1875: Allow Spark2 DataFrame to use a custom catalog

rymurr opened a new pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875


   This is the spark 2 equivalent of #1783 and enables Spark2 data frames to use a custom catalog by setting reader/writer options or via Spark conf.
   
   The spark conf are in keeping with Spark3 catalog params and are prefixed by `spark.sql.catalog.iceberg.`. These parameters correspond to the settings needed for a custom catalog named `iceberg` in Spark3


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547547165



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
+      .softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::load);
+  }
+
+  private static Catalog load(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name = sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
+    String catalogName = sparkConf.get(catalogPrefix, null);
+    if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
+        !org.apache.spark.sql.catalog.Catalog.class.getName().equals(catalogName)) {

Review comment:
       When I suggested using `SparkCatalog` earlier, I didn't think about how it isn't defined for 2.4. Instead of this check, let's just check whether the catalog property is defined at all. As long as `catalogName` is non-null, it is a catalog for the purposes of this config. If the catalog doesn't have a valid `type` or implementation class then loading it will fail.
   
   Also, it would be `catalogImpl` not name because `name` is the catalog name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547997210



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {

Review comment:
       ahh, yup. Agreed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r549511571



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,

Review comment:
       I think it's okay. The logic is correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547547165



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
+      .softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::load);
+  }
+
+  private static Catalog load(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name = sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
+    String catalogName = sparkConf.get(catalogPrefix, null);
+    if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
+        !org.apache.spark.sql.catalog.Catalog.class.getName().equals(catalogName)) {

Review comment:
       When I suggested using `SparkCatalog` earlier, I didn't think about how it isn't defined for 2.4. Instead of this check, let's just check whether the catalog property is defined at all. As long as `catalogName` is non-null, it is a catalog for the purposes of this config.
   
   Also, it would be `catalogImpl` not name because `name` is the catalog name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547546276



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);
+        return Pair.of(defaultCatalog, identiferProvider.of(namespace, name));
+      } else {
+        // Assume the first element is a valid catalog
+        String[] namespace = nameParts.subList(1, lastElementIndex).toArray(new String[0]);
+        return Pair.of(catalog, identiferProvider.of(namespace, name));
+      }
+    }
+  }
+
+  public interface IdentiferFunction<T> {

Review comment:
       This could be `BiFunction<String[], String, T>`. Then you wouldn't need a separate interface for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547444324



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       I don't think I quite understand you here.
   
   In Spark, a catalog exists if `spark.sql.catalog.name` is present and is a class that can be loaded. I would expect this method to return null if that property isn't the case. For Spark 2.4, I think it would be fine to assume that the class will be `SparkCatalog.class.getName()`. If it isn't, then return null. If it is, then the catalog exists and this should load the `IcebergCatalog` that would be used by `SparkCatalog` in Spark 3.x. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543520836



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C,T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalog,
+                                                       IdentiferFunction<T> identifer,
+                                                       String[] currentNamespace) {

Review comment:
       Nit: indentation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-749824691


   Thanks for the update! Looks almost ready now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547524941



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);
+    Map<String, String> props = new HashMap<>(properties);
+    props.putAll(properties);

Review comment:
       sigh...dumb error. Fixed

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);
+    Map<String, String> props = new HashMap<>(properties);
+    props.putAll(properties);
+    props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+    initialize(name, props);
+  }
 
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
+    Preconditions.checkArgument(inputWarehouseLocation != null && !inputWarehouseLocation.equals(""),
+        "no location provided for warehouse");

Review comment:
       sure, copy/paste...I have now fixed both instances

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);

Review comment:
       In catalog util `setConf` is called first. I think everywhere `initialize` is called `setConf` is called before

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog

Review comment:
       done

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +175,26 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl == null) {
+      String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+      switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+        case ICEBERG_CATALOG_TYPE_HIVE:
+          catalogImpl = ICEBERG_CATALOG_HIVE;
+          break;
+        case ICEBERG_CATALOG_TYPE_HADOOP:
+          catalogImpl = ICEBERG_CATALOG_HADOOP;
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+      }
+    }
+    return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+

Review comment:
       done

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +175,26 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+

Review comment:
       done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalog,
+                                                       IdentiferFunction<T> identifer,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(catalog.apply(null), identifer.of(currentNamespace, name));
+    } else {
+      try {

Review comment:
       done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,

Review comment:
       This was originally meant to return the default catalog if the catalog function got a null. Have now adjusted to add a default catalog parameter and have modified Spark3Util to use this function also. Ensuring they have the same behaviour.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547447774



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,

Review comment:
       I think that the logic here should be identical to the Spark 3 case, but with the catalog load function and identifier construction replaced. That doesn't appear to be what is done because `catalog.apply` is used when the catalog is not set (1 part name).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547525143



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -101,27 +94,7 @@
    */
   protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
     Configuration conf = SparkSession.active().sessionState().newHadoopConf();
-
-    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
-    if (catalogImpl != null) {
-      return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
-    }
-
-    String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
-    switch (catalogType.toLowerCase(Locale.ENGLISH)) {
-      case ICEBERG_CATALOG_TYPE_HIVE:
-        int clientPoolSize = options.getInt(CatalogProperties.HIVE_CLIENT_POOL_SIZE,
-            CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT);
-        String uri = options.get(CatalogProperties.HIVE_URI);
-        return new HiveCatalog(name, uri, clientPoolSize, conf);
-
-      case ICEBERG_CATALOG_TYPE_HADOOP:
-        String warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
-        return new HadoopCatalog(name, conf, warehouseLocation, options.asCaseSensitiveMap());
-
-      default:
-        throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
-    }
+    return CatalogUtil.buildIcebergCatalog(name, options.asCaseSensitiveMap(), conf);

Review comment:
       good catch, fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r541273746



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *

Review comment:
       New paragraphs in Javadoc require `<p>`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547995875



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
+      .softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::load);
+  }
+
+  private static Catalog load(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name = sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
+    String catalogName = sparkConf.get(catalogPrefix, null);
+    if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
+        !org.apache.spark.sql.catalog.Catalog.class.getName().equals(catalogName)) {

Review comment:
       cool, changed to an existence check.
   
   > Also, it would be catalogImpl not name because name is the catalog name.
   
   Not sure what you mean wrt the `catalogImpl` comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-744642301


   thanks for the review @rdblue 
   
   I have gone fairly extreme with respects to code reuse in this update. I thought I would get some input on the direction/design before I clean up and fix the build.
   
   Let me know what you think!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r549509358



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);

Review comment:
       Right, `toIndex` is exclusive, so this removes the last element that is passed as the name. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547544750



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);

Review comment:
       Okay, I was wrong then. The important thing is being consistent so you've already done that. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547545154



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,

Review comment:
       Current catalog? I think it would be current catalog and current namespace.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547993520



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);

Review comment:
       Good point, just copied from `Spark3Util` but it doesn't make much sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-745286527


   I had another look at this, I am not super happy with the way the catalog name is chosen nor with the way the Hive catalog is instantiated in `CatalogUtil`. However there is very little code re-use and the Spark2 behaviour will be very similar to Spark3 wrt catalogs and identifiers. The surface area of this PR also has decreased (most of it is moving methods around).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r541278363



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {
+    String name = "spark_source";
+    SparkConf sparkConf = SparkSession.active().sparkContext().getConf();
+    Map<String, String> sparkMap = Arrays.stream(sparkConf.getAllWithPrefix(ICEBERG_CATALOG_PREFIX))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+    sparkMap.putAll(options);
+    Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+
+    String catalogImpl = sparkMap.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      String cacheKey = options.entrySet()
+          .stream().map(x -> String.format("%s:%s", x.getKey(), x.getValue())).collect(Collectors.joining(";"));
+      return CATALOG_CACHE.get(cacheKey, x -> CatalogUtil.loadCatalog(catalogImpl, name, sparkMap, conf));

Review comment:
       I would expect the cache to delegate to `buildIcebergCatalog`, no the other way around. What about using a simple getter:
   
   ```java
   public static Catalog loadCatalog(SparkSession spark, String name) {
     return CATALOG_CACHE.get(Pair.of(spark, name));
   }
   ```
   
   Then this just needs to build the named catalog for a particular Spark session.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547992567



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");

Review comment:
       agreed, done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r541278363



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {
+    String name = "spark_source";
+    SparkConf sparkConf = SparkSession.active().sparkContext().getConf();
+    Map<String, String> sparkMap = Arrays.stream(sparkConf.getAllWithPrefix(ICEBERG_CATALOG_PREFIX))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+    sparkMap.putAll(options);
+    Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+
+    String catalogImpl = sparkMap.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      String cacheKey = options.entrySet()
+          .stream().map(x -> String.format("%s:%s", x.getKey(), x.getValue())).collect(Collectors.joining(";"));
+      return CATALOG_CACHE.get(cacheKey, x -> CatalogUtil.loadCatalog(catalogImpl, name, sparkMap, conf));

Review comment:
       I would expect the cache to delegate to `buildIcebergCatalog`, no the other way around. What about using a simple getter:
   
   ```java
   public static Catalog loadCatalog(SparkSession spark, String name) {
     return CATALOG_CACHE.get(Pair.of(spark, name));
   }
   ```
   
   Then this just needs to build the named catalog for a particular Spark session. And that could be done by moving the existing `SparkCatalog.buildIcebergCatalog` into `CatalogUtil` and calling it with the right name and config from the session.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543528002



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {

Review comment:
       This isn't really a "build" method any more since it wraps the cache, it is more of a "load" I think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547545485



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");

Review comment:
       "name parts" is an internal thing. What about dropping "parts" and just referring to it as "name"?
   
   Also, no need to capitalize identifier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547448881



##########
File path: spark2/src/test/java/org/apache/iceberg/spark/source/TestCustomCatalog.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestCustomCatalog {
+  private static final String CATALOG_IMPL = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.CATALOG_IMPL);
+  private static final String WAREHOUSE = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.WAREHOUSE_LOCATION);
+  private static final String URI_KEY = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.HIVE_URI);
+  private static final String URI_VAL = "thrift://localhost:12345"; // dummy uri
+  private static final String CATALOG_VAL = "org.apache.iceberg.spark.source.TestCatalog";
+  private static final TableIdentifier TABLE = TableIdentifier.of("default", "table");
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File tableDir = null;
+  String tableLocation = null;
+  HadoopTables tables;
+
+  protected static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() {
+    spark.stop();
+    spark = null;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    this.tableDir = temp.newFolder();
+    tableDir.delete(); // created by table create
+    this.tableLocation = tableDir.toURI().toString();
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), String.format("%s/%s", tableLocation, TABLE.name()));
+  }
+
+  @After
+  public void removeTable() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.remove(CATALOG_IMPL);
+    sparkConf.remove(WAREHOUSE);
+    sparkConf.remove(URI_KEY);
+    tables.dropTable(String.format("%s/%s", tableLocation, TABLE.name()));
+    tableDir.delete();
+    CustomCatalogs.clearCache();
+  }
+
+  @Test
+  public void withSparkOptions() {
+
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.set(CATALOG_IMPL, CATALOG_VAL);
+    sparkConf.set(URI_KEY, URI_VAL);
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+    AssertHelpers.assertThrows("We have not set all properties", IllegalArgumentException.class, () ->

Review comment:
       Can you use the variant of this that checks the exception message?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547994442



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);
+        return Pair.of(defaultCatalog, identiferProvider.of(namespace, name));
+      } else {
+        // Assume the first element is a valid catalog
+        String[] namespace = nameParts.subList(1, lastElementIndex).toArray(new String[0]);
+        return Pair.of(catalog, identiferProvider.of(namespace, name));
+      }
+    }
+  }
+
+  public interface IdentiferFunction<T> {

Review comment:
       agreed, don't know why I couldn't find that before




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-751807334


   > Thanks for the update! Looks almost ready now.
   
   Thanks @rdblue I have addressed all but one of your comments. A bit of clarification on that one would be helpful.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547544750



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);

Review comment:
       Okay, I was wrong then. The important thing is being consistent so you've already done that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547547967



##########
File path: spark2/src/test/java/org/apache/iceberg/spark/source/TestCustomCatalog.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Catalog;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestCustomCatalog {
+  private static final String CATALOG_IMPL = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.CATALOG_IMPL);
+  private static final String WAREHOUSE = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.WAREHOUSE_LOCATION);
+  private static final String URI_KEY = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.HIVE_URI);
+  private static final String URI_VAL = "thrift://localhost:12345"; // dummy uri
+  private static final String CATALOG_VAL = "org.apache.iceberg.spark.source.TestCatalog";
+  private static final TableIdentifier TABLE = TableIdentifier.of("default", "table");
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File tableDir = null;
+  String tableLocation = null;
+  HadoopTables tables;
+
+  protected static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() {
+    spark.stop();
+    spark = null;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.set(
+        String.format("%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX, CustomCatalogs.ICEBERG_DEFAULT_CATALOG),
+        Catalog.class.getName());
+    this.tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    this.tableDir = temp.newFolder();
+    tableDir.delete(); // created by table create
+    this.tableLocation = tableDir.toURI().toString();
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), String.format("%s/%s", tableLocation, TABLE.name()));
+  }
+
+  @After
+  public void removeTable() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.remove(CATALOG_IMPL);
+    sparkConf.remove(WAREHOUSE);
+    sparkConf.remove(URI_KEY);
+    tables.dropTable(String.format("%s/%s", tableLocation, TABLE.name()));
+    tableDir.delete();
+    CustomCatalogs.clearCache();
+  }
+
+  @Test
+  public void withSparkOptions() {

Review comment:
       Can you also add a test for a `catalog.db.table` name?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547543006



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       Yep! I would say that this should just respect Spark 3 settings.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r548051961



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);

Review comment:
       actually, scratch that. It does make sense. It takes [a,b,c] as the namespace and d as the name for [a,b,c,d]. So I have left it in.
   
   The other one returns a as the catalog [b,c] as the namespace and d as the table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r549513287



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
+      .softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::load);
+  }
+
+  private static Catalog load(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name = sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
+    String catalogName = sparkConf.get(catalogPrefix, null);
+    if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
+        !org.apache.spark.sql.catalog.Catalog.class.getName().equals(catalogName)) {

Review comment:
       This is minor, so I'm merging without it. We can clean it up next time we touch this code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547441593



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *

Review comment:
       The usual style is to use only `<p>` on the newline between paragraphs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547516199



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       To put it another way. If spark is set up to with `spark.sql.catalog.name` set then behave as if you are Spark3 and return a custom catalog. Otherwise return Hive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547440114



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);

Review comment:
       Can you call this last? I think it is typically set after calling `initialize`. The important thing is to try to use the same order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547996370



##########
File path: spark2/src/test/java/org/apache/iceberg/spark/source/TestCustomCatalog.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Catalog;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestCustomCatalog {
+  private static final String CATALOG_IMPL = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.CATALOG_IMPL);
+  private static final String WAREHOUSE = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.WAREHOUSE_LOCATION);
+  private static final String URI_KEY = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.HIVE_URI);
+  private static final String URI_VAL = "thrift://localhost:12345"; // dummy uri
+  private static final String CATALOG_VAL = "org.apache.iceberg.spark.source.TestCatalog";
+  private static final TableIdentifier TABLE = TableIdentifier.of("default", "table");
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File tableDir = null;
+  String tableLocation = null;
+  HadoopTables tables;
+
+  protected static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() {
+    spark.stop();
+    spark = null;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.set(
+        String.format("%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX, CustomCatalogs.ICEBERG_DEFAULT_CATALOG),
+        Catalog.class.getName());
+    this.tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    this.tableDir = temp.newFolder();
+    tableDir.delete(); // created by table create
+    this.tableLocation = tableDir.toURI().toString();
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), String.format("%s/%s", tableLocation, TABLE.name()));
+  }
+
+  @After
+  public void removeTable() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.remove(CATALOG_IMPL);
+    sparkConf.remove(WAREHOUSE);
+    sparkConf.remove(URI_KEY);
+    tables.dropTable(String.format("%s/%s", tableLocation, TABLE.name()));
+    tableDir.delete();
+    CustomCatalogs.clearCache();
+  }
+
+  @Test
+  public void withSparkOptions() {

Review comment:
       sorry, could you explain a bit more?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547446128



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalog,
+                                                       IdentiferFunction<T> identifer,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(catalog.apply(null), identifer.of(currentNamespace, name));
+    } else {
+      try {

Review comment:
       Rather than try/catch, I think this should check whether `catalog.apply` returns null. If the result is null, then the catalog does not exist and it should not be set in the pair (set null). Then the caller can fill in the default catalog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r542477426



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {
+    String name = "spark_source";
+    SparkConf sparkConf = SparkSession.active().sparkContext().getConf();
+    Map<String, String> sparkMap = Arrays.stream(sparkConf.getAllWithPrefix(ICEBERG_CATALOG_PREFIX))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+    sparkMap.putAll(options);
+    Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+
+    String catalogImpl = sparkMap.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      String cacheKey = options.entrySet()
+          .stream().map(x -> String.format("%s:%s", x.getKey(), x.getValue())).collect(Collectors.joining(";"));
+      return CATALOG_CACHE.get(cacheKey, x -> CatalogUtil.loadCatalog(catalogImpl, name, sparkMap, conf));

Review comment:
       I was thinking on similar lines at first also. The main reason i didn't is the `HiveCatalog` isn't in `iceberg-core` and we would get a circular dep between core and hive-metastore. We can change `HiveCatalog` to instantiate it similar to the custom catalogs and create it via reflection? or move the `buildIcebergCatalog` method higher up (eg spark module)

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *

Review comment:
       :+1:




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547545053



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       cool. I have attempted to do that here. The complication is the same method fetches default catalog and custom catalog so there is an extra check there. Can separate out if you prefer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543670252



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C,T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalog,
+                                                       IdentiferFunction<T> identifer,
+                                                       String[] currentNamespace) {

Review comment:
       fixe

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,40 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C,T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalog,
+                                                       IdentiferFunction<T> identifer,
+                                                       String[] currentNamespace) {

Review comment:
       fixed

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {

Review comment:
       fixed

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -24,16 +24,16 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r541276011



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {

Review comment:
       It looks like the options here are the read options. I don't think that it is necessary to pass any of the read options to create a catalog. In Spark 3, the catalogs exist and are configured using Spark config and the read options are independent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547445129



##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +175,26 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+

Review comment:
       No need to start methods with a newline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547440349



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog

Review comment:
       Can you also add when this will be removed? Like "will be removed in 0.12.0".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547445085



##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +175,26 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl == null) {
+      String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+      switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+        case ICEBERG_CATALOG_TYPE_HIVE:
+          catalogImpl = ICEBERG_CATALOG_HIVE;
+          break;
+        case ICEBERG_CATALOG_TYPE_HADOOP:
+          catalogImpl = ICEBERG_CATALOG_HADOOP;
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+      }
+    }
+    return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+

Review comment:
       Looks like this newline should be above the return to separate it from the control flow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543526987



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       This seems like a concern for `CatalogUtil.buildIcebergCatalog`, not here. This should create the map and pass it on to allow that method to reject the options. That way, the user gets a more specific error, like "Missing catalog implementation class or type". That can also create a default catalog if we choose to later -- though I'm skeptical we would -- without needing to change all of the callers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r549507747



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder()
+      .softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::load);
+  }
+
+  private static Catalog load(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name = sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s", ICEBERG_CATALOG_PREFIX, name);
+    String catalogName = sparkConf.get(catalogPrefix, null);
+    if (!name.equals(ICEBERG_DEFAULT_CATALOG) &&
+        !org.apache.spark.sql.catalog.Catalog.class.getName().equals(catalogName)) {

Review comment:
       The variable name `catalogName` seems incorrect because this is the catalog implementation class in Spark. A better variable name would be `catalogImpl`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543675340



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {
+    return CATALOG_CACHE.get(Pair.of(spark, name), CustomCatalogs::build);
+  }
+
+  private static Catalog build(Pair<SparkSession, String> sparkAndName) {
+    SparkSession spark = sparkAndName.first();
+    String name =  sparkAndName.second() == null ? ICEBERG_DEFAULT_CATALOG : sparkAndName.second();
+    SparkConf sparkConf = spark.sparkContext().getConf();
+    Configuration conf = spark.sessionState().newHadoopConf();
+
+    String catalogPrefix = String.format("%s.%s.", ICEBERG_CATALOG_PREFIX, name);
+    Map<String, String> options = Arrays.stream(sparkConf.getAllWithPrefix(catalogPrefix))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+
+    if (options.isEmpty() && !name.equals(ICEBERG_DEFAULT_CATALOG)) {
+      throw new IllegalArgumentException(String.format("Cannot instantiate catalog %s. Incorrect Parameters", name));

Review comment:
       the problem arises when a eg `default.table` is passed into this method. It will end up constructing a Hive catalog w/ existing options called `default`. The Hive catalog tries to pull config from `Configuration` if not passed via the constructor. So we eat the namespace and using it in the catalog name rather than constructing a default catalog and a table identifier `default.table`.
   
   We could:
   * not allow catalog names to be specified in Spark2 sources
   * Stop using `Configuration` in hive tables
   * keep this the way it is
   
   I prefer option 2 but i am worried that may have unintended consequences elsewhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547438609



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);
+    Map<String, String> props = new HashMap<>(properties);
+    props.putAll(properties);

Review comment:
       This adds properties to the map twice?
   
   We usually prefer `Maps.newHashMap()`, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547449720



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -101,27 +94,7 @@
    */
   protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
     Configuration conf = SparkSession.active().sessionState().newHadoopConf();
-
-    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
-    if (catalogImpl != null) {
-      return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
-    }
-
-    String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
-    switch (catalogType.toLowerCase(Locale.ENGLISH)) {
-      case ICEBERG_CATALOG_TYPE_HIVE:
-        int clientPoolSize = options.getInt(CatalogProperties.HIVE_CLIENT_POOL_SIZE,
-            CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT);
-        String uri = options.get(CatalogProperties.HIVE_URI);
-        return new HiveCatalog(name, uri, clientPoolSize, conf);
-
-      case ICEBERG_CATALOG_TYPE_HADOOP:
-        String warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
-        return new HadoopCatalog(name, conf, warehouseLocation, options.asCaseSensitiveMap());
-
-      default:
-        throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
-    }
+    return CatalogUtil.buildIcebergCatalog(name, options.asCaseSensitiveMap(), conf);

Review comment:
       This makes config properties case sensitive. Should we convert to a Java case insensitive map?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543670169



##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +174,40 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+    }
+
+    String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+    switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+      case ICEBERG_CATALOG_TYPE_HIVE:
+        String clientPoolSize = options.getOrDefault(CatalogProperties.HIVE_CLIENT_POOL_SIZE,
+            Integer.toString(CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT));
+        String uri = options.get(CatalogProperties.HIVE_URI);
+        return buildHiveCatalog(name, uri, Integer.parseInt(clientPoolSize), conf);
+      case ICEBERG_CATALOG_TYPE_HADOOP:
+        String warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
+        return new HadoopCatalog(name, conf, warehouseLocation, options);
+
+      default:
+        throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+    }
+  }
+
+  private static Catalog buildHiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {

Review comment:
       cool. I have switched both Hive and Hadoop catalogs to use this pattern. I left the old constructors and marked Deprecated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547540747



##########
File path: spark2/src/test/java/org/apache/iceberg/spark/source/TestCustomCatalog.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestCustomCatalog {
+  private static final String CATALOG_IMPL = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.CATALOG_IMPL);
+  private static final String WAREHOUSE = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.WAREHOUSE_LOCATION);
+  private static final String URI_KEY = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.HIVE_URI);
+  private static final String URI_VAL = "thrift://localhost:12345"; // dummy uri
+  private static final String CATALOG_VAL = "org.apache.iceberg.spark.source.TestCatalog";
+  private static final TableIdentifier TABLE = TableIdentifier.of("default", "table");
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File tableDir = null;
+  String tableLocation = null;
+  HadoopTables tables;
+
+  protected static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() {
+    spark.stop();
+    spark = null;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    this.tableDir = temp.newFolder();
+    tableDir.delete(); // created by table create
+    this.tableLocation = tableDir.toURI().toString();
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), String.format("%s/%s", tableLocation, TABLE.name()));
+  }
+
+  @After
+  public void removeTable() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.remove(CATALOG_IMPL);
+    sparkConf.remove(WAREHOUSE);
+    sparkConf.remove(URI_KEY);
+    tables.dropTable(String.format("%s/%s", tableLocation, TABLE.name()));
+    tableDir.delete();
+    CustomCatalogs.clearCache();
+  }
+
+  @Test
+  public void withSparkOptions() {
+
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.set(CATALOG_IMPL, CATALOG_VAL);
+    sparkConf.set(URI_KEY, URI_VAL);
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+    AssertHelpers.assertThrows("We have not set all properties", IllegalArgumentException.class, () ->

Review comment:
       done 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r542476855



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {

Review comment:
       ok, I had originally aimed to support setting catalog options via `SparkConf` or via read options. But I guess there isn't much of a reason to support the 2nd use case. Have removed this feature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547545947



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");
+
+    int lastElementIndex = nameParts.size() - 1;
+    String name = nameParts.get(lastElementIndex);
+
+    if (nameParts.size() == 1) {
+      // Only a single element, use current catalog and namespace
+      return Pair.of(defaultCatalog, identiferProvider.of(currentNamespace, name));
+    } else {
+      C catalog = catalogProvider.apply(nameParts.get(0));
+      if (catalog == null) {
+        // The first element was not a valid catalog, treat it like part of the namespace
+        String[] namespace =  nameParts.subList(0, lastElementIndex).toArray(new String[0]);

Review comment:
       Does this need `subList` if it is returning the entire list?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547439649



##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
##########
@@ -68,45 +70,62 @@
  *
  * Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
  */
-public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces {
+public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
 
   private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
   private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
   private static final Joiner SLASH = Joiner.on("/");
   private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
 
-  private final String catalogName;
-  private final Configuration conf;
-  private final String warehouseLocation;
-  private final FileSystem fs;
-  private final FileIO fileIO;
+  private String catalogName;
+  private Configuration conf;
+  private String warehouseLocation;
+  private FileSystem fs;
+  private FileIO fileIO;
+
+  public HadoopCatalog(){
+  }
 
   /**
    * The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
     this(name, conf, warehouseLocation, Maps.newHashMap());
   }
 
   /**
    * The all-arg constructor of the HadoopCatalog.
    *
+   * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog
    * @param name The catalog name
    * @param conf The Hadoop configuration
    * @param warehouseLocation The location used as warehouse directory
    * @param properties catalog properties
    */
+  @Deprecated
   public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
     Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
         "no location provided for warehouse");
+    setConf(conf);
+    Map<String, String> props = new HashMap<>(properties);
+    props.putAll(properties);
+    props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+    initialize(name, props);
+  }
 
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
+    Preconditions.checkArgument(inputWarehouseLocation != null && !inputWarehouseLocation.equals(""),
+        "no location provided for warehouse");

Review comment:
       Nit: Error messages should use sentence case, with the first word capitalized. I'd also make it more clear that `warehouse` is the configuration key to set, like "Cannot create Hadoop catalog without 'warehouse' location".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547992567



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");

Review comment:
       agreed, done. Note this also created a similar change in `Spark3Util`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547992532



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,

Review comment:
       It is the default catalog in `Spark3Util` on master. Is that wrong? current catalog seems more appropriate to me...

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -61,4 +65,42 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  /**
+   * A modified version of Spark's LookupCatalog.CatalogAndIdentifier.unapply
+   * Attempts to find the catalog and identifier a multipart identifier represents
+   * @param nameParts Multipart identifier representing a table
+   * @return The CatalogPlugin and Identifier for the table
+   */
+  public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
+                                                       Function<String, C> catalogProvider,
+                                                       IdentiferFunction<T> identiferProvider,
+                                                       C defaultCatalog,
+                                                       String[] currentNamespace) {
+    Preconditions.checkArgument(!nameParts.isEmpty(),
+        "Cannot determine catalog and Identifier from empty name parts");

Review comment:
       agreed, done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543518999



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<String, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog.iceberg.";
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   * custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   * the Metastore URIs as per previous behaviour.
+   *
+   * @param options options from Spark
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(Map<String, String> options) {
+    String name = "spark_source";
+    SparkConf sparkConf = SparkSession.active().sparkContext().getConf();
+    Map<String, String> sparkMap = Arrays.stream(sparkConf.getAllWithPrefix(ICEBERG_CATALOG_PREFIX))
+        .collect(Collectors.toMap(x -> x._1, x -> x._2));
+    sparkMap.putAll(options);
+    Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+
+    String catalogImpl = sparkMap.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      String cacheKey = options.entrySet()
+          .stream().map(x -> String.format("%s:%s", x.getKey(), x.getValue())).collect(Collectors.joining(";"));
+      return CATALOG_CACHE.get(cacheKey, x -> CatalogUtil.loadCatalog(catalogImpl, name, sparkMap, conf));

Review comment:
       +1 for decoupling and using reflection for HiveCatalog. I think that's the right solution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r547548347



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/CustomCatalogs.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+public final class CustomCatalogs {
+  private static final Cache<Pair<SparkSession, String>, Catalog> CATALOG_CACHE = Caffeine.newBuilder().softValues().build();
+
+  public static final String ICEBERG_DEFAULT_CATALOG = "default_catalog";
+  public static final String ICEBERG_CATALOG_PREFIX = "spark.sql.catalog";
+
+  private CustomCatalogs() {
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} to be used by this Spark source adapter.
+   *
+   * <p>
+   *   The cache is to facilitate reuse of catalogs, especially if wrapped in CachingCatalog. For non-Hive catalogs all
+   *   custom parameters passed to the catalog are considered in the cache key. Hive catalogs only cache based on
+   *   the Metastore URIs as per previous behaviour.
+   * </p>
+   *
+   * @param spark Spark Session
+   * @param name Catalog Name
+   * @return an Iceberg catalog
+   */
+  public static Catalog buildIcebergCatalog(SparkSession spark, String name) {

Review comment:
       Sorry, I think _this_ method should be renamed to `loadCatalog`. The private method was fine as `buildCatalog`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543520080



##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -169,6 +174,40 @@ public static Catalog loadCatalog(
     return catalog;
   }
 
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
+
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl != null) {
+      return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+    }
+
+    String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+    switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+      case ICEBERG_CATALOG_TYPE_HIVE:
+        String clientPoolSize = options.getOrDefault(CatalogProperties.HIVE_CLIENT_POOL_SIZE,
+            Integer.toString(CatalogProperties.HIVE_CLIENT_POOL_SIZE_DEFAULT));
+        String uri = options.get(CatalogProperties.HIVE_URI);
+        return buildHiveCatalog(name, uri, Integer.parseInt(clientPoolSize), conf);
+      case ICEBERG_CATALOG_TYPE_HADOOP:
+        String warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
+        return new HadoopCatalog(name, conf, warehouseLocation, options);
+
+      default:
+        throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
+    }
+  }
+
+  private static Catalog buildHiveCatalog(String name, String uri, int clientPoolSize, Configuration conf) {

Review comment:
       This works, but I think it would be better to make it so that the Hive catalog can be loaded using the normal no-arg constructor followed by `initialize` and `setConf`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-751890971


   Thanks @rymurr! Looks great so I merged it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r549429917



##########
File path: spark2/src/test/java/org/apache/iceberg/spark/source/TestCustomCatalog.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Catalog;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestCustomCatalog {
+  private static final String CATALOG_IMPL = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.CATALOG_IMPL);
+  private static final String WAREHOUSE = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.WAREHOUSE_LOCATION);
+  private static final String URI_KEY = String.format("%s.%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX,
+      CustomCatalogs.ICEBERG_DEFAULT_CATALOG, CatalogProperties.HIVE_URI);
+  private static final String URI_VAL = "thrift://localhost:12345"; // dummy uri
+  private static final String CATALOG_VAL = "org.apache.iceberg.spark.source.TestCatalog";
+  private static final TableIdentifier TABLE = TableIdentifier.of("default", "table");
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  File tableDir = null;
+  String tableLocation = null;
+  HadoopTables tables;
+
+  protected static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() {
+    spark.stop();
+    spark = null;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.set(
+        String.format("%s.%s", CustomCatalogs.ICEBERG_CATALOG_PREFIX, CustomCatalogs.ICEBERG_DEFAULT_CATALOG),
+        Catalog.class.getName());
+    this.tables = new HadoopTables(spark.sessionState().newHadoopConf());
+    this.tableDir = temp.newFolder();
+    tableDir.delete(); // created by table create
+    this.tableLocation = tableDir.toURI().toString();
+    tables.create(SCHEMA, PartitionSpec.unpartitioned(), String.format("%s/%s", tableLocation, TABLE.name()));
+  }
+
+  @After
+  public void removeTable() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    sparkConf.remove(CATALOG_IMPL);
+    sparkConf.remove(WAREHOUSE);
+    sparkConf.remove(URI_KEY);
+    tables.dropTable(String.format("%s/%s", tableLocation, TABLE.name()));
+    tableDir.delete();
+    CustomCatalogs.clearCache();
+  }
+
+  @Test
+  public void withSparkOptions() {

Review comment:
       misunderstood your comment. Added a test for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#discussion_r543528924



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -24,16 +24,16 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;

Review comment:
       Looks like some imports might be stale.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #1875: Allow Spark2 DataFrame to use a custom catalog

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #1875:
URL: https://github.com/apache/iceberg/pull/1875#issuecomment-749815318


   Thanks @rdblue I've cleaned up based on your comments. I think there is a bit still around how a catalog is chosen but getting close


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org