You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 18:21:40 UTC
[flink] branch master updated: [FLINK-12649][hive] Add a shim layer
to support multiple versions of Hive Metastore
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 038ab38 [FLINK-12649][hive] Add a shim layer to support multiple versions of Hive Metastore
038ab38 is described below
commit 038ab385c6f9af129b5eda7fe05d8b39d6122077
Author: Rui Li <li...@apache.org>
AuthorDate: Wed May 29 18:25:25 2019 +0800
[FLINK-12649][hive] Add a shim layer to support multiple versions of Hive Metastore
To add shim layer for HMS client, in order to support different versions of HMS.
This closes #8564.
---
flink-connectors/flink-connector-hive/pom.xml | 21 +-
.../flink/table/catalog/hive/HiveCatalog.java | 28 +--
.../catalog/hive/HiveMetastoreClientFactory.java | 34 ++++
.../catalog/hive/HiveMetastoreClientWrapper.java | 226 +++++++++++++++++++++
.../apache/flink/table/catalog/hive/HiveShim.java | 65 ++++++
.../flink/table/catalog/hive/HiveShimLoader.java | 57 ++++++
.../flink/table/catalog/hive/HiveShimV1.java | 82 ++++++++
.../flink/table/catalog/hive/HiveShimV2.java | 73 +++++++
8 files changed, 562 insertions(+), 24 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 25e475e..5c205d8 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -83,6 +83,7 @@ under the License.
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
@@ -385,7 +386,6 @@ under the License.
<include>commons-beanutils:commons-beanutils</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.jolbox:bonecp</include>
- <include>org.apache.hive:*</include>
<include>org.apache.thrift:libthrift</include>
<include>org.datanucleus:*</include>
<include>org.antlr:antlr-runtime</include>
@@ -423,4 +423,23 @@ under the License.
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <!-- Activate this profile with -Phive-1.2.1 to build and test against hive-1.2.1 -->
+ <profile>
+ <id>hive-1.2.1</id>
+ <properties>
+ <hive.version>1.2.1</hive.version>
+ <hivemetastore.hadoop.version>2.6.0</hivemetastore.hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>javax.jdo</groupId>
+ <artifactId>jdo-api</artifactId>
+ <version>3.0.1</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8022c45..562c198 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,10 +53,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -108,7 +105,7 @@ public class HiveCatalog extends AbstractCatalog {
protected final HiveConf hiveConf;
- protected IMetaStoreClient client;
+ protected HiveMetastoreClientWrapper client;
public HiveCatalog(String catalogName, String hivemetastoreURI) {
this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
@@ -133,23 +130,10 @@ public class HiveCatalog extends AbstractCatalog {
return hiveConf;
}
- private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
- try {
- return RetryingMetaStoreClient.getProxy(
- hiveConf,
- null,
- null,
- HiveMetaStoreClient.class.getName(),
- true);
- } catch (MetaException e) {
- throw new CatalogException("Failed to create Hive metastore client", e);
- }
- }
-
@Override
public void open() throws CatalogException {
if (client == null) {
- client = getMetastoreClient(hiveConf);
+ client = HiveMetastoreClientFactory.create(hiveConf);
LOG.info("Connected to Hive metastore");
}
@@ -444,10 +428,7 @@ public class HiveCatalog extends AbstractCatalog {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
try {
- return client.getTables(
- databaseName,
- null, // table pattern
- TableType.VIRTUAL_VIEW);
+ return client.getViews(databaseName);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
@@ -996,7 +977,8 @@ public class HiveCatalog extends AbstractCatalog {
}
try {
- return client.getFunctions(databaseName, null);
+ // hive-1.x requires the pattern not being null, so pass a pattern that matches any name
+ return client.getFunctions(databaseName, ".*");
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list functions in database %s", databaseName), e);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java
new file mode 100644
index 0000000..be46552
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Factory to create Hive metastore client.
+ */
+public class HiveMetastoreClientFactory {
+
+ private HiveMetastoreClientFactory() {
+ }
+
+ public static HiveMetastoreClientWrapper create(HiveConf hiveConf) {
+ return new HiveMetastoreClientWrapper(hiveConf);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
new file mode 100644
index 0000000..43937ab
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to handle different Hive versions.
+ * Methods provided mostly conforms to IMetaStoreClient interfaces except those that require shims.
+ */
+@Internal
+public class HiveMetastoreClientWrapper implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
+
+ private final IMetaStoreClient client;
+ private final HiveConf hiveConf;
+
+ public HiveMetastoreClientWrapper(HiveConf hiveConf) {
+ this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
+ client = createMetastoreClient();
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+
+ public List<String> getDatabases(String pattern) throws MetaException, TException {
+ return client.getDatabases(pattern);
+ }
+
+ public List<String> getAllDatabases() throws MetaException, TException {
+ return client.getAllDatabases();
+ }
+
+ public List<String> getAllTables(String databaseName) throws MetaException, TException, UnknownDBException {
+ return client.getAllTables(databaseName);
+ }
+
+ public void dropTable(String databaseName, String tableName)
+ throws MetaException, TException, NoSuchObjectException {
+ client.dropTable(databaseName, tableName);
+ }
+
+ public void dropTable(String dbName, String tableName, boolean deleteData, boolean ignoreUnknownTable)
+ throws MetaException, NoSuchObjectException, TException {
+ client.dropTable(dbName, tableName, deleteData, ignoreUnknownTable);
+ }
+
+ public boolean tableExists(String databaseName, String tableName)
+ throws MetaException, TException, UnknownDBException {
+ return client.tableExists(databaseName, tableName);
+ }
+
+ public Database getDatabase(String name) throws NoSuchObjectException, MetaException, TException {
+ return client.getDatabase(name);
+ }
+
+ public Table getTable(String databaseName, String tableName)
+ throws MetaException, NoSuchObjectException, TException {
+ return client.getTable(databaseName, tableName);
+ }
+
+ public Partition add_partition(Partition partition)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ return client.add_partition(partition);
+ }
+
+ public int add_partitions(List<Partition> partitionList)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ return client.add_partitions(partitionList);
+ }
+
+ public Partition getPartition(String databaseName, String tableName, List<String> list)
+ throws NoSuchObjectException, MetaException, TException {
+ return client.getPartition(databaseName, tableName, list);
+ }
+
+ public List<String> listPartitionNames(String databaseName, String tableName, short maxPartitions)
+ throws MetaException, TException {
+ return client.listPartitionNames(databaseName, tableName, maxPartitions);
+ }
+
+ public List<String> listPartitionNames(String databaseName, String tableName, List<String> partitionValues,
+ short maxPartitions) throws MetaException, TException, NoSuchObjectException {
+ return client.listPartitionNames(databaseName, tableName, partitionValues, maxPartitions);
+ }
+
+ public void createTable(Table table)
+ throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException {
+ client.createTable(table);
+ }
+
+ public void alter_table(String databaseName, String tableName, Table table)
+ throws InvalidOperationException, MetaException, TException {
+ client.alter_table(databaseName, tableName, table);
+ }
+
+ public void createDatabase(Database database)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ client.createDatabase(database);
+ }
+
+ public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExists)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ client.dropDatabase(name, deleteData, ignoreIfNotExists);
+ }
+
+ public void alterDatabase(String name, Database database) throws NoSuchObjectException, MetaException, TException {
+ client.alterDatabase(name, database);
+ }
+
+ public boolean dropPartition(String databaseName, String tableName, List<String> partitionValues, boolean deleteData)
+ throws NoSuchObjectException, MetaException, TException {
+ return client.dropPartition(databaseName, tableName, partitionValues, deleteData);
+ }
+
+ public void alter_partition(String databaseName, String tableName, Partition partition)
+ throws InvalidOperationException, MetaException, TException {
+ client.alter_partition(databaseName, tableName, partition);
+ }
+
+ public void renamePartition(String databaseName, String tableName, List<String> partitionValues, Partition partition)
+ throws InvalidOperationException, MetaException, TException {
+ client.renamePartition(databaseName, tableName, partitionValues, partition);
+ }
+
+ public void createFunction(Function function) throws InvalidObjectException, MetaException, TException {
+ client.createFunction(function);
+ }
+
+ public void alterFunction(String databaseName, String functionName, Function function)
+ throws InvalidObjectException, MetaException, TException {
+ client.alterFunction(databaseName, functionName, function);
+ }
+
+ public void dropFunction(String databaseName, String functionName)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException, TException {
+ client.dropFunction(databaseName, functionName);
+ }
+
+ public List<String> getFunctions(String databaseName, String pattern) throws MetaException, TException {
+ return client.getFunctions(databaseName, pattern);
+ }
+
+ List<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, List<String> columnNames)
+ throws NoSuchObjectException, MetaException, TException {
+ return client.getTableColumnStatistics(databaseName, tableName, columnNames);
+ }
+
+ Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tableName,
+ List<String> partNames, List<String> colNames)
+ throws NoSuchObjectException, MetaException, TException {
+ return client.getPartitionColumnStatistics(dbName, tableName, partNames, colNames);
+ }
+
+ public boolean updateTableColumnStatistics(ColumnStatistics columnStatistics)
+ throws NoSuchObjectException, InvalidObjectException, MetaException, TException, InvalidInputException {
+ return client.updateTableColumnStatistics(columnStatistics);
+ }
+
+ public List<Partition> listPartitions(String dbName, String tblName, List<String> partVals, short max) throws TException {
+ return client.listPartitions(dbName, tblName, partVals, max);
+ }
+
+ public List<Partition> listPartitions(String dbName, String tblName, short max) throws TException {
+ return client.listPartitions(dbName, tblName, max);
+ }
+
+ //-------- Start of shimmed methods ----------
+
+ public List<String> getViews(String databaseName) throws UnknownDBException, TException {
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+ return hiveShim.getViews(client, databaseName);
+ }
+
+ private IMetaStoreClient createMetastoreClient() {
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+ return hiveShim.getHiveMetastoreClient(hiveConf);
+ }
+
+ public Function getFunction(String databaseName, String functionName) throws MetaException, TException {
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+ return hiveShim.getFunction(client, databaseName, functionName);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java
new file mode 100644
index 0000000..422dfe6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.util.List;
+
+/**
+ * A shim layer to support different versions of HMS.
+ */
+public interface HiveShim {
+
+ /**
+ * Create a Hive Metastore client based on the given HiveConf object.
+ *
+ * @param hiveConf HiveConf instance
+ * @return an IMetaStoreClient instance
+ */
+ IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf);
+
+ /**
+ * Get a list of views in the given database from the given Hive Metastore client.
+ *
+ * @param client Hive Metastore client
+ * @param databaseName the name of the database
+ * @return A list of names of the views
+ * @throws UnknownDBException if the database doesn't exist
+ * @throws TException for any other generic exceptions caused by Thrift
+ */
+ List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException;
+
+ /**
+ * Gets a function from a database with the given HMS client.
+ *
+ * @param client the Hive Metastore client
+ * @param dbName name of the database
+ * @param functionName name of the function
+ * @return the Function under the specified name
+ * @throws NoSuchObjectException if the function doesn't exist
+ * @throws TException for any other generic exceptions caused by Thrift
+ */
+ Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException;
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java
new file mode 100644
index 0000000..ef90547
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A loader to load HiveShim.
+ */
+public class HiveShimLoader {
+
+ private static final String HIVE_V1_VERSION_NAME = "1.2.1";
+ private static final String HIVE_V2_VERSION_NAME = "2.3.4";
+
+ private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2);
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveShimLoader.class);
+
+ private HiveShimLoader() {
+ }
+
+ public static HiveShim loadHiveShim() {
+ String version = HiveVersionInfo.getVersion();
+ return hiveShims.computeIfAbsent(version, (v) -> {
+ if (v.startsWith(HIVE_V1_VERSION_NAME)) {
+ return new HiveShimV1();
+ }
+ if (v.startsWith(HIVE_V2_VERSION_NAME)) {
+ return new HiveShimV2();
+ }
+ throw new CatalogException("Unsupported Hive version " + v);
+ });
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java
new file mode 100644
index 0000000..1048f52
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shim for Hive version 1.x.
+ */
+public class HiveShimV1 implements HiveShim {
+
+ @Override
+ public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+ try {
+ Method method = RetryingMetaStoreClient.class.getMethod("getProxy", HiveConf.class);
+ // getProxy is a static method
+ return (IMetaStoreClient) method.invoke(null, (hiveConf));
+ } catch (Exception ex) {
+ throw new CatalogException("Failed to create Hive Metastore client", ex);
+ }
+ }
+
+ @Override
+ // 1.x client doesn't support filtering tables by type, so here we need to get all tables and filter by ourselves
+ public List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
+ // We don't have to use reflection here because client.getAllTables(String) is supposed to be there for
+ // all versions.
+ List<String> tableNames = client.getAllTables(databaseName);
+ List<String> views = new ArrayList<>();
+ for (String name : tableNames) {
+ Table table = client.getTable(databaseName, name);
+ String viewDef = table.getViewOriginalText();
+ if (viewDef != null && !viewDef.isEmpty()) {
+ views.add(table.getTableName());
+ }
+ }
+ return views;
+ }
+
+ @Override
+ public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
+ try {
+ // hive-1.x doesn't throw NoSuchObjectException if function doesn't exist, instead it throws a MetaException
+ return client.getFunction(dbName, functionName);
+ } catch (MetaException e) {
+ if (e.getCause() instanceof NoSuchObjectException) {
+ throw (NoSuchObjectException) e.getCause();
+ }
+ throw e;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java
new file mode 100644
index 0000000..fefb48f
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Shim for Hive version 2.x.
+ */
+public class HiveShimV2 implements HiveShim {
+
+ @Override
+ public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+ try {
+ Method method = RetryingMetaStoreClient.class.getMethod("getProxy", HiveConf.class, Boolean.TYPE);
+ // getProxy is a static method
+ return (IMetaStoreClient) method.invoke(null, hiveConf, true);
+ } catch (Exception ex) {
+ throw new CatalogException("Failed to create Hive Metastore client", ex);
+ }
+ }
+
+ @Override
+ public List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
+ try {
+ Method method = client.getClass().getMethod("getTables", String.class, String.class, TableType.class);
+ return (List<String>) method.invoke(client, databaseName, null, TableType.VIRTUAL_VIEW);
+ } catch (InvocationTargetException ite) {
+ Throwable targetEx = ite.getTargetException();
+ if (targetEx instanceof TException) {
+ throw (TException) targetEx;
+ } else {
+ throw new CatalogException(String.format("Failed to get views for %s", databaseName), targetEx);
+ }
+ } catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new CatalogException(String.format("Failed to get views for %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
+ return client.getFunction(dbName, functionName);
+ }
+}