You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/24 10:14:55 UTC
[flink] branch release-1.11 updated: [FLINK-19361][hive] Create a
synchronized metastore client to talk to a remote HMS
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 2ca2f9c [FLINK-19361][hive] Create a synchronized metastore client to talk to a remote HMS
2ca2f9c is described below
commit 2ca2f9cddfda4141a9bd2327fc3753e3ef14e54f
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Sep 24 18:13:33 2020 +0800
[FLINK-19361][hive] Create a synchronized metastore client to talk to a remote HMS
This closes #13469
---
.../apache/flink/table/catalog/hive/HiveCatalog.java | 7 ++++++-
.../hive/client/HiveMetastoreClientWrapper.java | 7 ++++++-
.../flink/table/catalog/hive/HiveCatalogITCase.java | 20 ++++++++++++++++++++
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 46eb2fd..bf53c5d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -170,7 +170,7 @@ public class HiveCatalog extends AbstractCatalog {
this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
if (!allowEmbedded) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(this.hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)),
+ checkArgument(!isEmbeddedMetastore(this.hiveConf),
"Embedded metastore is not allowed. Make sure you have set a valid value for " +
HiveConf.ConfVars.METASTOREURIS.toString());
}
@@ -1470,4 +1470,9 @@ public class HiveCatalog extends AbstractCatalog {
throw new CatalogException("Unsupported alter table operation " + alterOp);
}
}
+
+ @VisibleForTesting
+ public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
+ return StringUtils.isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index 63a557b..e456b6bf 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -68,7 +70,10 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null or empty");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
- client = createMetastoreClient();
+ // use synchronized client in case we're talking to a remote HMS
+ client = HiveCatalog.isEmbeddedMetastore(hiveConf) ?
+ createMetastoreClient() :
+ HiveMetaStoreClient.newSynchronizedClient(createMetastoreClient());
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index ba12079..1030744 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -67,6 +67,11 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
@@ -413,4 +418,19 @@ public class HiveCatalogITCase {
tEnv.executeSql("DROP TABLE print_table");
}
}
+
+ @Test
+ public void testConcurrentAccessHiveCatalog() throws Exception {
+ int numThreads = 5;
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ Callable<List<String>> listDBCallable = () -> hiveCatalog.listDatabases();
+ List<Future<List<String>>> listDBFutures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ listDBFutures.add(executorService.submit(listDBCallable));
+ }
+ executorService.shutdown();
+ for (Future<List<String>> future : listDBFutures) {
+ future.get(5, TimeUnit.SECONDS);
+ }
+ }
}