You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/24 10:14:55 UTC

[flink] branch release-1.11 updated: [FLINK-19361][hive] Create a synchronized metastore client to talk to a remote HMS

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 2ca2f9c  [FLINK-19361][hive] Create a synchronized metastore client to talk to a remote HMS
2ca2f9c is described below

commit 2ca2f9cddfda4141a9bd2327fc3753e3ef14e54f
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Sep 24 18:13:33 2020 +0800

    [FLINK-19361][hive] Create a synchronized metastore client to talk to a remote HMS
    
    This closes #13469
---
 .../apache/flink/table/catalog/hive/HiveCatalog.java |  7 ++++++-
 .../hive/client/HiveMetastoreClientWrapper.java      |  7 ++++++-
 .../flink/table/catalog/hive/HiveCatalogITCase.java  | 20 ++++++++++++++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 46eb2fd..bf53c5d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -170,7 +170,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 		this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
 		if (!allowEmbedded) {
-			checkArgument(!StringUtils.isNullOrWhitespaceOnly(this.hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)),
+			checkArgument(!isEmbeddedMetastore(this.hiveConf),
 					"Embedded metastore is not allowed. Make sure you have set a valid value for " +
 							HiveConf.ConfVars.METASTOREURIS.toString());
 		}
@@ -1470,4 +1470,9 @@ public class HiveCatalog extends AbstractCatalog {
 				throw new CatalogException("Unsupported alter table operation " + alterOp);
 		}
 	}
+
+	@VisibleForTesting
+	public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
+		return StringUtils.isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index 63a557b..e456b6bf 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -68,7 +70,10 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
 		this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null or empty");
 		hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
-		client = createMetastoreClient();
+		// use synchronized client in case we're talking to a remote HMS
+		client = HiveCatalog.isEmbeddedMetastore(hiveConf) ?
+				createMetastoreClient() :
+				HiveMetaStoreClient.newSynchronizedClient(createMetastoreClient());
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index ba12079..1030744 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -67,6 +67,11 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
@@ -413,4 +418,19 @@ public class HiveCatalogITCase {
 			tEnv.executeSql("DROP TABLE print_table");
 		}
 	}
+
+	@Test
+	public void testConcurrentAccessHiveCatalog() throws Exception {
+		int numThreads = 5;
+		ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+		Callable<List<String>> listDBCallable = () -> hiveCatalog.listDatabases();
+		List<Future<List<String>>> listDBFutures = new ArrayList<>();
+		for (int i = 0; i < numThreads; i++) {
+			listDBFutures.add(executorService.submit(listDBCallable));
+		}
+		executorService.shutdown();
+		for (Future<List<String>> future : listDBFutures) {
+			future.get(5, TimeUnit.SECONDS);
+		}
+	}
 }