You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/28 01:51:37 UTC

[iceberg] branch master updated: Spark: Validate HMS uri in SparkSessionCatalog (#5134)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c27efc075 Spark: Validate HMS uri in SparkSessionCatalog (#5134)
c27efc075 is described below

commit c27efc075b6bc56cc1fffd1937b547518e68fc17
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Mon Jun 27 18:51:33 2022 -0700

    Spark: Validate HMS uri in SparkSessionCatalog (#5134)
---
 .../apache/iceberg/spark/SparkSessionCatalog.java  | 24 ++++++++
 .../iceberg/spark/TestSparkSessionCatalog.java     | 66 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index edc544762..483b0d4c6 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -20,9 +20,13 @@
 package org.apache.iceberg.spark;
 
 import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -259,6 +263,10 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
+    if (options.containsKey("type") && options.get("type").equalsIgnoreCase("hive")) {
+      validateHmsUri(options.get(CatalogProperties.URI));
+    }
+
     this.catalogName = name;
     this.icebergCatalog = buildSparkCatalog(name, options);
     if (icebergCatalog instanceof StagingTableCatalog) {
@@ -270,6 +278,22 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
     this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
   }
 
+  private void validateHmsUri(String catalogHmsUri) {
+    if (catalogHmsUri == null) {
+      return;
+    }
+
+    Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+    String envHmsUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, null);
+    if (envHmsUri == null) {
+      return;
+    }
+
+    Preconditions.checkArgument(catalogHmsUri.equals(envHmsUri),
+        "Inconsistent Hive metastore URIs: %s (Spark session) != %s (spark_catalog)",
+        envHmsUri, catalogHmsUri);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
new file mode 100644
index 000000000..3ab2d6b23
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+public class TestSparkSessionCatalog extends SparkTestBase {
+  @Test
+  public void testValidateHmsUri() {
+    String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+    String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
+    String hmsUri = hiveConf.get(METASTOREURIS.varname);
+
+    spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog");
+    spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+
+    // HMS uris match
+    spark.sessionState().catalogManager().reset();
+    spark.conf().set(envHmsUriKey, hmsUri);
+    spark.conf().set(catalogHmsUriKey, hmsUri);
+    Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+
+    // HMS uris doesn't match
+    spark.sessionState().catalogManager().reset();
+    String catalogHmsUri = "RandomString";
+    spark.conf().set(envHmsUriKey, hmsUri);
+    spark.conf().set(catalogHmsUriKey, catalogHmsUri);
+    IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class,
+        () -> spark.sessionState().catalogManager().v2SessionCatalog());
+    String errorMessage = String.format("Inconsistent Hive metastore URIs: %s (Spark session) != %s (spark_catalog)",
+        hmsUri, catalogHmsUri);
+    Assert.assertEquals(errorMessage, exception.getMessage());
+
+    // no env HMS uri, only catalog HMS uri
+    spark.sessionState().catalogManager().reset();
+    spark.conf().set(catalogHmsUriKey, hmsUri);
+    spark.conf().unset(envHmsUriKey);
+    Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+
+    // no catalog HMS uri, only env HMS uri
+    spark.sessionState().catalogManager().reset();
+    spark.conf().set(envHmsUriKey, hmsUri);
+    spark.conf().unset(catalogHmsUriKey);
+    Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+  }
+}