You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/13 06:21:54 UTC
[flink-table-store] branch release-0.3 updated: [FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 7a03fcc2 [FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client
7a03fcc2 is described below
commit 7a03fcc2b63aa8ca502c244df8da91a67588f7b5
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jan 13 14:19:57 2023 +0800
[FLINK-30620] Table Store Hive catalog supports specifying custom Hive metastore client
This closes #475.
(cherry picked from commit 6144fa3a65356322b800d0b392099f0ac92043f3)
---
docs/content/docs/how-to/creating-catalogs.md | 2 +
.../apache/flink/table/store/hive/HiveCatalog.java | 17 +++-
.../flink/table/store/hive/HiveCatalogFactory.java | 28 +++++-
.../flink/table/store/hive/HiveCatalogLock.java | 12 ++-
.../hive/CustomHiveMetastoreClientITCase.java | 101 +++++++++++++++++++++
.../table/store/hive/TestHiveMetaStoreClient.java | 56 ++++++++++++
.../store/spark/SparkCaseSensitiveConverter.java | 12 +--
.../store/spark/SparkCaseSensitiveConverter.java | 12 +--
8 files changed, 214 insertions(+), 26 deletions(-)
diff --git a/docs/content/docs/how-to/creating-catalogs.md b/docs/content/docs/how-to/creating-catalogs.md
index ea2712ef..59da3cae 100644
--- a/docs/content/docs/how-to/creating-catalogs.md
+++ b/docs/content/docs/how-to/creating-catalogs.md
@@ -31,6 +31,8 @@ Table Store catalogs currently support two types of metastores:
* `filesystem` metastore (default), which stores both metadata and table files in filesystems.
* `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive.
+See [CatalogOptions]({{< ref "docs/maintenance/configurations#catalogoptions" >}}) for detailed options when creating a catalog.
+
## Creating a Catalog with Filesystem Metastore
{{< tabs "filesystem-metastore-example" >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 11982c63..90269ee4 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
@@ -75,17 +76,19 @@ public class HiveCatalog extends AbstractCatalog {
"org.apache.flink.table.store.hive.TableStoreHiveStorageHandler";
private final HiveConf hiveConf;
+ private final String clientClassName;
private final IMetaStoreClient client;
- public HiveCatalog(Configuration hadoopConfig) {
+ public HiveCatalog(Configuration hadoopConfig, String clientClassName) {
this.hiveConf = new HiveConf(hadoopConfig, HiveConf.class);
- this.client = createClient(hiveConf);
+ this.clientClassName = clientClassName;
+ this.client = createClient(hiveConf, clientClassName);
}
@Override
public Optional<CatalogLock.Factory> lockFactory() {
return lockEnabled()
- ? Optional.of(HiveCatalogLock.createFactory(hiveConf))
+ ? Optional.of(HiveCatalogLock.createFactory(hiveConf, clientClassName))
: Optional.empty();
}
@@ -408,12 +411,16 @@ public class HiveCatalog extends AbstractCatalog {
return Lock.fromCatalog(lock, tablePath);
}
- static IMetaStoreClient createClient(HiveConf hiveConf) {
+ static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
IMetaStoreClient client;
try {
client =
RetryingMetaStoreClient.getProxy(
- hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+ hiveConf,
+ tbl -> null,
+ new ConcurrentHashMap<>(),
+ clientClassName,
+ true);
} catch (MetaException e) {
throw new RuntimeException(e);
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index 2488c130..2b46e81f 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,20 +18,33 @@
package org.apache.flink.table.store.hive;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CatalogOptions;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
-
-import static org.apache.flink.table.store.CatalogOptions.URI;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {
private static final String IDENTIFIER = "hive";
+ private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+ ConfigOptions.key("metastore.client.class")
+ .stringType()
+ .defaultValue(HiveMetaStoreClient.class.getName())
+ .withDescription(
+ "Class name of Hive metastore client.\n"
+ + "NOTE: This class must directly implements "
+ + IMetaStoreClient.class.getName()
+ + ".");
+
@Override
public String identifier() {
return IDENTIFIER;
@@ -41,13 +54,18 @@ public class HiveCatalogFactory implements CatalogFactory {
public Catalog create(String warehouse, Configuration options) {
String uri =
Preconditions.checkNotNull(
- options.get(URI),
- URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
+ options.get(CatalogOptions.URI),
+ CatalogOptions.URI.key()
+ + " must be set for table store "
+ + IDENTIFIER
+ + " catalog");
+
org.apache.hadoop.conf.Configuration hadoopConfig =
new org.apache.hadoop.conf.Configuration();
options.toMap().forEach(hadoopConfig::set);
hadoopConfig.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
hadoopConfig.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
- return new HiveCatalog(hadoopConfig);
+
+ return new HiveCatalog(hadoopConfig, options.get(METASTORE_CLIENT_CLASS));
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
index 64352278..e6960f36 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -112,8 +112,8 @@ public class HiveCatalogLock implements CatalogLock {
}
/** Create a hive lock factory. */
- public static CatalogLock.Factory createFactory(HiveConf hiveConf) {
- return new HiveCatalogLockFactory(hiveConf);
+ public static CatalogLock.Factory createFactory(HiveConf hiveConf, String clientClassName) {
+ return new HiveCatalogLockFactory(hiveConf, clientClassName);
}
private static class HiveCatalogLockFactory implements CatalogLock.Factory {
@@ -121,16 +121,20 @@ public class HiveCatalogLock implements CatalogLock {
private static final long serialVersionUID = 1L;
private final SerializableHiveConf hiveConf;
+ private final String clientClassName;
- public HiveCatalogLockFactory(HiveConf hiveConf) {
+ public HiveCatalogLockFactory(HiveConf hiveConf, String clientClassName) {
this.hiveConf = new SerializableHiveConf(hiveConf);
+ this.clientClassName = clientClassName;
}
@Override
public CatalogLock create() {
HiveConf conf = hiveConf.conf();
return new HiveCatalogLock(
- HiveCatalog.createClient(conf), checkMaxSleep(conf), acquireTimeout(conf));
+ HiveCatalog.createClient(conf, clientClassName),
+ checkMaxSleep(conf),
+ acquireTimeout(conf));
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java
new file mode 100644
index 00000000..d979bbdf
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/CustomHiveMetastoreClientITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** IT cases for {@link HiveCatalog} with custom Hive metastore client. */
+@RunWith(FlinkEmbeddedHiveRunner.class)
+public class CustomHiveMetastoreClientITCase {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ private String path;
+ private TableEnvironment tEnv;
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
+ @Before
+ public void before() throws Exception {
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+ hiveShell.execute("USE test_db");
+ }
+
+ @After
+ public void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ }
+
+ @Test
+ public void testCustomMetastoreClient() throws Exception {
+ path = folder.newFolder().toURI().toString();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ tEnv = TableEnvironmentImpl.create(settings);
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive WITH (",
+ " 'type' = 'table-store',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'metastore.client.class' = '"
+ + TestHiveMetaStoreClient.class.getName()
+ + "'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG my_hive").await();
+ Assert.assertEquals(
+ Arrays.asList(
+ Row.of("default"),
+ Row.of("test_db"),
+ Row.of(TestHiveMetaStoreClient.MOCK_DATABASE)),
+ collect("SHOW DATABASES"));
+ }
+
+ private List<Row> collect(String sql) throws Exception {
+ List<Row> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java
new file mode 100644
index 00000000..a8e5d316
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/TestHiveMetaStoreClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link HiveMetaStoreClient} to test custom Hive metastore client. */
+public class TestHiveMetaStoreClient extends HiveMetaStoreClient implements IMetaStoreClient {
+
+ public static final String MOCK_DATABASE = "test_mock_database";
+
+ public TestHiveMetaStoreClient(HiveConf conf) throws MetaException {
+ super(conf);
+ }
+
+ public TestHiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
+ throws MetaException {
+ super(conf, hookLoader);
+ }
+
+ public TestHiveMetaStoreClient(
+ HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
+ throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ @Override
+ public List<String> getAllDatabases() throws MetaException {
+ List<String> result = new ArrayList<>(super.getAllDatabases());
+ result.add(MOCK_DATABASE);
+ return result;
+ }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
index 562834f9..4329efca 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -35,14 +35,14 @@ public class SparkCaseSensitiveConverter {
private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
// OSS access verification
- private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
- private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
- private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+ private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+ private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
static {
- CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
- CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
- CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+ CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_ID);
+ CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_SECRET);
+ CASE_SENSITIVE_KEYS.add(OSS_SECURITY_TOKEN);
}
public static Map<String, String> convert(Map<String, String> caseInsensitiveOptions) {
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
index e662430d..80736c47 100644
--- a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -38,14 +38,14 @@ public class SparkCaseSensitiveConverter {
private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
// OSS access verification
- private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
- private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
- private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+ private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+ private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
static {
- CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
- CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
- CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+ CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_ID);
+ CASE_SENSITIVE_KEYS.add(OSS_ACCESS_KEY_SECRET);
+ CASE_SENSITIVE_KEYS.add(OSS_SECURITY_TOKEN);
}
public static Map<String, String> convert(DataSourceOptions options) {