You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/13 06:21:54 UTC

[flink-table-store] branch release-0.3 updated: [FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new 7a03fcc2 [FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client
7a03fcc2 is described below

commit 7a03fcc2b63aa8ca502c244df8da91a67588f7b5
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jan 13 14:19:57 2023 +0800

    [FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client
    
    This closes #475.
    
    (cherry picked from commit 6144fa3a65356322b800d0b392099f0ac92043f3)
---
 docs/content/docs/how-to/creating-catalogs.md      |   2 +
 .../apache/flink/table/store/hive/HiveCatalog.java |  17 +++-
 .../flink/table/store/hive/HiveCatalogFactory.java |  28 +++++-
 .../flink/table/store/hive/HiveCatalogLock.java    |  12 ++-
 .../hive/CustomHiveMetastoreClientITCase.java      | 101 +++++++++++++++++++++
 .../table/store/hive/TestHiveMetaStoreClient.java  |  56 ++++++++++++
 .../store/spark/SparkCaseSensitiveConverter.java   |  12 +--
 .../store/spark/SparkCaseSensitiveConverter.java   |  12 +--
 8 files changed, 214 insertions(+), 26 deletions(-)

diff --git a/docs/content/docs/how-to/creating-catalogs.md b/docs/content/docs/how-to/creating-catalogs.md
index ea2712ef..59da3cae 100644
--- a/docs/content/docs/how-to/creating-catalogs.md
+++ b/docs/content/docs/how-to/creating-catalogs.md
@@ -31,6 +31,8 @@ Table Store catalogs currently support two types of metastores:
 * `filesystem` metastore (default), which stores both metadata and table files in filesystems.
 * `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive.
 
+See [CatalogOptions]({{< ref "docs/maintenance/configurations#catalogoptions" >}}) for detailed options when creating a catalog.
+
 ## Creating a Catalog with Filesystem Metastore
 
 {{< tabs "filesystem-metastore-example" >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 11982c63..90269ee4 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
@@ -75,17 +76,19 @@ public class HiveCatalog extends AbstractCatalog {
             "org.apache.flink.table.store.hive.TableStoreHiveStorageHandler";
 
     private final HiveConf hiveConf;
+    private final String clientClassName;
     private final IMetaStoreClient client;
 
-    public HiveCatalog(Configuration hadoopConfig) {
+    public HiveCatalog(Configuration hadoopConfig, String clientClassName) {
         this.hiveConf = new HiveConf(hadoopConfig, HiveConf.class);
-        this.client = createClient(hiveConf);
+        this.clientClassName = clientClassName;
+        this.client = createClient(hiveConf, clientClassName);
     }
 
     @Override
     public Optional<CatalogLock.Factory> lockFactory() {
         return lockEnabled()
-                ? Optional.of(HiveCatalogLock.createFactory(hiveConf))
+                ? Optional.of(HiveCatalogLock.createFactory(hiveConf, clientClassName))
                 : Optional.empty();
     }
 
@@ -408,12 +411,16 @@ public class HiveCatalog extends AbstractCatalog {
         return Lock.fromCatalog(lock, tablePath);
     }
 
-    static IMetaStoreClient createClient(HiveConf hiveConf) {
+    static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
         IMetaStoreClient client;
         try {
             client =
                     RetryingMetaStoreClient.getProxy(
-                            hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+                            hiveConf,
+                            tbl -> null,
+                            new ConcurrentHashMap<>(),
+                            clientClassName,
+                            true);
         } catch (MetaException e) {
             throw new RuntimeException(e);
         }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index 2488c130..2b46e81f 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,20 +18,33 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CatalogOptions;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-
-import static org.apache.flink.table.store.CatalogOptions.URI;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 
 /** Factory to create {@link HiveCatalog}. */
 public class HiveCatalogFactory implements CatalogFactory {
 
     private static final String IDENTIFIER = "hive";
 
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+                    .stringType()
+                    .defaultValue(HiveMetaStoreClient.class.getName())
+                    .withDescription(
+                            "Class name of Hive metastore client.\n"
+                                    + "NOTE: This class must directly implements "
+                                    + IMetaStoreClient.class.getName()
+                                    + ".");
+
     @Override
     public String identifier() {
         return IDENTIFIER;
@@ -41,13 +54,18 @@ public class HiveCatalogFactory implements CatalogFactory {
     public Catalog create(String warehouse, Configuration options) {
         String uri =
                 Preconditions.checkNotNull(
-                        options.get(URI),
-                        URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
+                        options.get(CatalogOptions.URI),
+                        CatalogOptions.URI.key()
+                                + " must be set for table store "
+                                + IDENTIFIER
+                                + " catalog");
+
         org.apache.hadoop.conf.Configuration hadoopConfig =
                 new org.apache.hadoop.conf.Configuration();
         options.toMap().forEach(hadoopConfig::set);
         hadoopConfig.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
         hadoopConfig.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
-        return new HiveCatalog(hadoopConfig);
+
+        return new HiveCatalog(hadoopConfig, options.get(METASTORE_CLIENT_CLASS));
     }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
index 64352278..e6960f36 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -112,8 +112,8 @@ public class HiveCatalogLock implements CatalogLock {
     }
 
     /** Create a hive lock factory. */
-    public static CatalogLock.Factory createFactory(HiveConf hiveConf) {
-        return new HiveCatalogLockFactory(hiveConf);
+    public static CatalogLock.Factory createFactory(HiveConf hiveConf, String clientClassName) {
+        return new HiveCatalogLockFactory(hiveConf, clientClassName);
     }
 
     private static class HiveCatalogLockFactory implements CatalogLock.Factory {
@@ -121,16 +121,20 @@ public class HiveCatalogLock implements CatalogLock {
         private static final long serialVersionUID = 1L;
 
         private final SerializableHiveConf hiveConf;
+        private final String clientClassName;
 
-        public HiveCatalogLockFactory(HiveConf hiveConf) {
+        public HiveCatalogLockFactory(HiveConf hiveConf, String clientClassName) {
             this.hiveConf = new SerializableHiveConf(hiveConf);
+            this.clientClassName = clientClassName;
         }
 
         @Override
         public CatalogLock create() {
             HiveConf conf = hiveConf.conf();
             return new HiveCatalogLock(
-                    HiveCatalog.createClient(conf), checkMaxSleep(conf), acquireTimeout(conf));
+                    HiveCatalog.createClient(conf, clientClassName),
+                    checkMaxSleep(conf),
+                    acquireTimeout(conf));
         }
     }
 
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java
new file mode 100644
index 00000000..d979bbdf
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** IT cases for {@link HiveCatalog} with custom Hive metastore client. */
+@RunWith(FlinkEmbeddedHiveRunner.class)
+public class CustomHiveMetastoreClientITCase {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    private String path;
+    private TableEnvironment tEnv;
+
+    @HiveSQL(files = {})
+    private static HiveShell hiveShell;
+
+    @Before
+    public void before() throws Exception {
+        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+        hiveShell.execute("USE test_db");
+    }
+
+    @After
+    public void after() {
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+    }
+
+    @Test
+    public void testCustomMetastoreClient() throws Exception {
+        path = folder.newFolder().toURI().toString();
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        tEnv = TableEnvironmentImpl.create(settings);
+        tEnv.executeSql(
+                        String.join(
+                                "\n",
+                                "CREATE CATALOG my_hive WITH (",
+                                "  'type' = 'table-store',",
+                                "  'metastore' = 'hive',",
+                                "  'uri' = '',",
+                                "  'warehouse' = '" + path + "',",
+                                "  'metastore.client.class' = '"
+                                        + TestHiveMetaStoreClient.class.getName()
+                                        + "'",
+                                ")"))
+                .await();
+        tEnv.executeSql("USE CATALOG my_hive").await();
+        Assert.assertEquals(
+                Arrays.asList(
+                        Row.of("default"),
+                        Row.of("test_db"),
+                        Row.of(TestHiveMetaStoreClient.MOCK_DATABASE)),
+                collect("SHOW DATABASES"));
+    }
+
+    private List<Row> collect(String sql) throws Exception {
+        List<Row> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next());
+            }
+        }
+        return result;
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java
new file mode 100644
index 00000000..a8e5d316
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link HiveMetaStoreClient} to test custom Hive metastore client. */
+public class TestHiveMetaStoreClient extends HiveMetaStoreClient implements IMetaStoreClient {
+
+    public static final String MOCK_DATABASE = "test_mock_database";
+
+    public TestHiveMetaStoreClient(HiveConf conf) throws MetaException {
+        super(conf);
+    }
+
+    public TestHiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
+            throws MetaException {
+        super(conf, hookLoader);
+    }
+
+    public TestHiveMetaStoreClient(
+            HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
+            throws MetaException {
+        super(conf, hookLoader, allowEmbedded);
+    }
+
+    @Override
+    public List<String> getAllDatabases() throws MetaException {
+        List<String> result = new ArrayList<>(super.getAllDatabases());
+        result.add(MOCK_DATABASE);
+        return result;
+    }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
index 562834f9..4329efca 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -35,14 +35,14 @@ public class SparkCaseSensitiveConverter {
     private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
 
     // OSS access verification
-    private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
-    private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
-    private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+    private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+    private static final String OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+    private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
 
     static {
-        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
-        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
-        CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+        CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_ID);
+        CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_SECRET);
+        CASE_SENSITIVE_KEYS.add(OSS_SECURITY_TOKEN);
     }
 
     public static Map<String, String> convert(Map<String, String> caseInsensitiveOptions) {
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
index e662430d..80736c47 100644
--- a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -38,14 +38,14 @@ public class SparkCaseSensitiveConverter {
     private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
 
     // OSS access verification
-    private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
-    private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
-    private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+    private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+    private static final String OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+    private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
 
     static {
-        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
-        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
-        CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+        CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_ID);
+        CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_SECRET);
+        CASE_SENSITIVE_KEYS.add(OSS_SECURITY_TOKEN);
     }
 
     public static Map<String, String> convert(DataSourceOptions options) {