You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/28 01:51:37 UTC
[iceberg] branch master updated: Spark: Validate HMS uri in SparkSessionCatalog (#5134)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c27efc075 Spark: Validate HMS uri in SparkSessionCatalog (#5134)
c27efc075 is described below
commit c27efc075b6bc56cc1fffd1937b547518e68fc17
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Mon Jun 27 18:51:33 2022 -0700
Spark: Validate HMS uri in SparkSessionCatalog (#5134)
---
.../apache/iceberg/spark/SparkSessionCatalog.java | 24 ++++++++
.../iceberg/spark/TestSparkSessionCatalog.java | 66 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index edc544762..483b0d4c6 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -20,9 +20,13 @@
package org.apache.iceberg.spark;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -259,6 +263,10 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
+ if (options.containsKey("type") && options.get("type").equalsIgnoreCase("hive")) {
+ validateHmsUri(options.get(CatalogProperties.URI));
+ }
+
this.catalogName = name;
this.icebergCatalog = buildSparkCatalog(name, options);
if (icebergCatalog instanceof StagingTableCatalog) {
@@ -270,6 +278,22 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
}
+ private void validateHmsUri(String catalogHmsUri) {
+ if (catalogHmsUri == null) {
+ return;
+ }
+
+ Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+ String envHmsUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, null);
+ if (envHmsUri == null) {
+ return;
+ }
+
+ Preconditions.checkArgument(catalogHmsUri.equals(envHmsUri),
+ "Inconsistent Hive metastore URIs: %s (Spark session) != %s (spark_catalog)",
+ envHmsUri, catalogHmsUri);
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
new file mode 100644
index 000000000..3ab2d6b23
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionCatalog.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+public class TestSparkSessionCatalog extends SparkTestBase {
+ @Test
+ public void testValidateHmsUri() {
+ String envHmsUriKey = "spark.hadoop." + METASTOREURIS.varname;
+ String catalogHmsUriKey = "spark.sql.catalog.spark_catalog.uri";
+ String hmsUri = hiveConf.get(METASTOREURIS.varname);
+
+ spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog");
+ spark.conf().set("spark.sql.catalog.spark_catalog.type", "hive");
+
+ // HMS uris match
+ spark.sessionState().catalogManager().reset();
+ spark.conf().set(envHmsUriKey, hmsUri);
+ spark.conf().set(catalogHmsUriKey, hmsUri);
+ Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+
+ // HMS uris doesn't match
+ spark.sessionState().catalogManager().reset();
+ String catalogHmsUri = "RandomString";
+ spark.conf().set(envHmsUriKey, hmsUri);
+ spark.conf().set(catalogHmsUriKey, catalogHmsUri);
+ IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class,
+ () -> spark.sessionState().catalogManager().v2SessionCatalog());
+ String errorMessage = String.format("Inconsistent Hive metastore URIs: %s (Spark session) != %s (spark_catalog)",
+ hmsUri, catalogHmsUri);
+ Assert.assertEquals(errorMessage, exception.getMessage());
+
+ // no env HMS uri, only catalog HMS uri
+ spark.sessionState().catalogManager().reset();
+ spark.conf().set(catalogHmsUriKey, hmsUri);
+ spark.conf().unset(envHmsUriKey);
+ Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+
+ // no catalog HMS uri, only env HMS uri
+ spark.sessionState().catalogManager().reset();
+ spark.conf().set(envHmsUriKey, hmsUri);
+ spark.conf().unset(catalogHmsUriKey);
+ Assert.assertTrue(spark.sessionState().catalogManager().v2SessionCatalog().defaultNamespace()[0].equals("default"));
+ }
+}