You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 18:21:40 UTC

[flink] branch master updated: [FLINK-12649][hive] Add a shim layer to support multiple versions of Hive Metastore

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 038ab38  [FLINK-12649][hive] Add a shim layer to support multiple versions of Hive Metastore
038ab38 is described below

commit 038ab385c6f9af129b5eda7fe05d8b39d6122077
Author: Rui Li <li...@apache.org>
AuthorDate: Wed May 29 18:25:25 2019 +0800

    [FLINK-12649][hive] Add a shim layer to support multiple versions of Hive Metastore
    
    To add shim layer for HMS client, in order to support different versions of HMS.
    
    This closes #8564.
---
 flink-connectors/flink-connector-hive/pom.xml      |  21 +-
 .../flink/table/catalog/hive/HiveCatalog.java      |  28 +--
 .../catalog/hive/HiveMetastoreClientFactory.java   |  34 ++++
 .../catalog/hive/HiveMetastoreClientWrapper.java   | 226 +++++++++++++++++++++
 .../apache/flink/table/catalog/hive/HiveShim.java  |  65 ++++++
 .../flink/table/catalog/hive/HiveShimLoader.java   |  57 ++++++
 .../flink/table/catalog/hive/HiveShimV1.java       |  82 ++++++++
 .../flink/table/catalog/hive/HiveShimV2.java       |  73 +++++++
 8 files changed, 562 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 25e475e..5c205d8 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -83,6 +83,7 @@ under the License.
 			<groupId>org.apache.hive</groupId>
 			<artifactId>hive-metastore</artifactId>
 			<version>${hive.version}</version>
+			<scope>provided</scope>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.hive</groupId>
@@ -385,7 +386,6 @@ under the License.
 									<include>commons-beanutils:commons-beanutils</include>
 									<include>com.fasterxml.jackson.core:*</include>
 									<include>com.jolbox:bonecp</include>
-									<include>org.apache.hive:*</include>
 									<include>org.apache.thrift:libthrift</include>
 									<include>org.datanucleus:*</include>
 									<include>org.antlr:antlr-runtime</include>
@@ -423,4 +423,23 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
+
+	<profiles>
+		<!-- Activate this profile with -Phive-1.2.1 to build and test against hive-1.2.1 -->
+		<profile>
+			<id>hive-1.2.1</id>
+			<properties>
+				<hive.version>1.2.1</hive.version>
+				<hivemetastore.hadoop.version>2.6.0</hivemetastore.hadoop.version>
+			</properties>
+			<dependencies>
+				<dependency>
+					<groupId>javax.jdo</groupId>
+					<artifactId>jdo-api</artifactId>
+					<version>3.0.1</version>
+					<scope>provided</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
 </project>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8022c45..562c198 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,10 +53,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -108,7 +105,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 	protected final HiveConf hiveConf;
 
-	protected IMetaStoreClient client;
+	protected HiveMetastoreClientWrapper client;
 
 	public HiveCatalog(String catalogName, String hivemetastoreURI) {
 		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
@@ -133,23 +130,10 @@ public class HiveCatalog extends AbstractCatalog {
 		return hiveConf;
 	}
 
-	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
-		try {
-			return RetryingMetaStoreClient.getProxy(
-				hiveConf,
-				null,
-				null,
-				HiveMetaStoreClient.class.getName(),
-				true);
-		} catch (MetaException e) {
-			throw new CatalogException("Failed to create Hive metastore client", e);
-		}
-	}
-
 	@Override
 	public void open() throws CatalogException {
 		if (client == null) {
-			client = getMetastoreClient(hiveConf);
+			client = HiveMetastoreClientFactory.create(hiveConf);
 			LOG.info("Connected to Hive metastore");
 		}
 
@@ -444,10 +428,7 @@ public class HiveCatalog extends AbstractCatalog {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
 
 		try {
-			return client.getTables(
-				databaseName,
-				null, // table pattern
-				TableType.VIRTUAL_VIEW);
+			return client.getViews(databaseName);
 		} catch (UnknownDBException e) {
 			throw new DatabaseNotExistException(getName(), databaseName);
 		} catch (TException e) {
@@ -996,7 +977,8 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 
 		try {
-			return client.getFunctions(databaseName, null);
+			// hive-1.x requires the pattern not being null, so pass a pattern that matches any name
+			return client.getFunctions(databaseName, ".*");
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to list functions in database %s", databaseName), e);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java
new file mode 100644
index 0000000..be46552
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Factory to create Hive metastore client.
+ */
+public class HiveMetastoreClientFactory {
+
+	private HiveMetastoreClientFactory() {
+	}
+
+	public static HiveMetastoreClientWrapper create(HiveConf hiveConf) {
+		return new HiveMetastoreClientWrapper(hiveConf);
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
new file mode 100644
index 0000000..43937ab
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to handle different Hive versions.
+ * Methods provided mostly conforms to IMetaStoreClient interfaces except those that require shims.
+ */
+@Internal
+public class HiveMetastoreClientWrapper implements AutoCloseable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
+
+	private final IMetaStoreClient client;
+	private final HiveConf hiveConf;
+
+	public HiveMetastoreClientWrapper(HiveConf hiveConf) {
+		this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
+		client = createMetastoreClient();
+	}
+
+	@Override
+	public void close() {
+		client.close();
+	}
+
+	public List<String> getDatabases(String pattern) throws MetaException, TException {
+		return client.getDatabases(pattern);
+	}
+
+	public List<String> getAllDatabases() throws MetaException, TException {
+		return client.getAllDatabases();
+	}
+
+	public List<String> getAllTables(String databaseName) throws MetaException, TException, UnknownDBException {
+		return client.getAllTables(databaseName);
+	}
+
+	public void dropTable(String databaseName, String tableName)
+			throws MetaException, TException, NoSuchObjectException {
+		client.dropTable(databaseName, tableName);
+	}
+
+	public void dropTable(String dbName, String tableName, boolean deleteData, boolean ignoreUnknownTable)
+			throws MetaException, NoSuchObjectException, TException {
+		client.dropTable(dbName, tableName, deleteData, ignoreUnknownTable);
+	}
+
+	public boolean tableExists(String databaseName, String tableName)
+			throws MetaException, TException, UnknownDBException {
+		return client.tableExists(databaseName, tableName);
+	}
+
+	public Database getDatabase(String name) throws NoSuchObjectException, MetaException, TException {
+		return client.getDatabase(name);
+	}
+
+	public Table getTable(String databaseName, String tableName)
+			throws MetaException, NoSuchObjectException, TException {
+		return client.getTable(databaseName, tableName);
+	}
+
+	public Partition add_partition(Partition partition)
+			throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+		return client.add_partition(partition);
+	}
+
+	public int add_partitions(List<Partition> partitionList)
+			throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+		return client.add_partitions(partitionList);
+	}
+
+	public Partition getPartition(String databaseName, String tableName, List<String> list)
+			throws NoSuchObjectException, MetaException, TException {
+		return client.getPartition(databaseName, tableName, list);
+	}
+
+	public List<String> listPartitionNames(String databaseName, String tableName, short maxPartitions)
+			throws MetaException, TException {
+		return client.listPartitionNames(databaseName, tableName, maxPartitions);
+	}
+
+	public List<String> listPartitionNames(String databaseName, String tableName, List<String> partitionValues,
+										short maxPartitions) throws MetaException, TException, NoSuchObjectException {
+		return client.listPartitionNames(databaseName, tableName, partitionValues, maxPartitions);
+	}
+
+	public void createTable(Table table)
+			throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException {
+		client.createTable(table);
+	}
+
+	public void alter_table(String databaseName, String tableName, Table table)
+			throws InvalidOperationException, MetaException, TException {
+		client.alter_table(databaseName, tableName, table);
+	}
+
+	public void createDatabase(Database database)
+			throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+		client.createDatabase(database);
+	}
+
+	public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExists)
+			throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+		client.dropDatabase(name, deleteData, ignoreIfNotExists);
+	}
+
+	public void alterDatabase(String name, Database database) throws NoSuchObjectException, MetaException, TException {
+		client.alterDatabase(name, database);
+	}
+
+	public boolean dropPartition(String databaseName, String tableName, List<String> partitionValues, boolean deleteData)
+			throws NoSuchObjectException, MetaException, TException {
+		return client.dropPartition(databaseName, tableName, partitionValues, deleteData);
+	}
+
+	public void alter_partition(String databaseName, String tableName, Partition partition)
+			throws InvalidOperationException, MetaException, TException {
+		client.alter_partition(databaseName, tableName, partition);
+	}
+
+	public void renamePartition(String databaseName, String tableName, List<String> partitionValues, Partition partition)
+			throws InvalidOperationException, MetaException, TException {
+		client.renamePartition(databaseName, tableName, partitionValues, partition);
+	}
+
+	public void createFunction(Function function) throws InvalidObjectException, MetaException, TException {
+		client.createFunction(function);
+	}
+
+	public void alterFunction(String databaseName, String functionName, Function function)
+			throws InvalidObjectException, MetaException, TException {
+		client.alterFunction(databaseName, functionName, function);
+	}
+
+	public void dropFunction(String databaseName, String functionName)
+			throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException, TException {
+		client.dropFunction(databaseName, functionName);
+	}
+
+	public List<String> getFunctions(String databaseName, String pattern) throws MetaException, TException {
+		return client.getFunctions(databaseName, pattern);
+	}
+
+	List<ColumnStatisticsObj> getTableColumnStatistics(String databaseName, String tableName, List<String> columnNames)
+			throws NoSuchObjectException, MetaException, TException {
+		return client.getTableColumnStatistics(databaseName, tableName, columnNames);
+	}
+
+	Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tableName,
+																		List<String> partNames, List<String> colNames)
+			throws NoSuchObjectException, MetaException, TException {
+		return client.getPartitionColumnStatistics(dbName, tableName, partNames, colNames);
+	}
+
+	public boolean updateTableColumnStatistics(ColumnStatistics columnStatistics)
+			throws NoSuchObjectException, InvalidObjectException, MetaException, TException, InvalidInputException {
+		return client.updateTableColumnStatistics(columnStatistics);
+	}
+
+	public List<Partition> listPartitions(String dbName, String tblName, List<String> partVals, short max) throws TException {
+		return client.listPartitions(dbName, tblName, partVals, max);
+	}
+
+	public List<Partition> listPartitions(String dbName, String tblName, short max) throws TException {
+		return client.listPartitions(dbName, tblName, max);
+	}
+
+	//-------- Start of shimmed methods ----------
+
+	public List<String> getViews(String databaseName) throws UnknownDBException, TException {
+		HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+		return hiveShim.getViews(client, databaseName);
+	}
+
+	private IMetaStoreClient createMetastoreClient() {
+		HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+		return hiveShim.getHiveMetastoreClient(hiveConf);
+	}
+
+	public Function getFunction(String databaseName, String functionName) throws MetaException, TException {
+		HiveShim hiveShim = HiveShimLoader.loadHiveShim();
+		return hiveShim.getFunction(client, databaseName, functionName);
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java
new file mode 100644
index 0000000..422dfe6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShim.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.util.List;
+
+/**
+ * A shim layer to support different versions of HMS.
+ */
+public interface HiveShim {
+
+	/**
+	 * Create a Hive Metastore client based on the given HiveConf object.
+	 *
+	 * @param hiveConf HiveConf instance
+	 * @return an IMetaStoreClient instance
+	 */
+	IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf);
+
+	/**
+	 * Get a list of views in the given database from the given Hive Metastore client.
+	 *
+	 * @param client       Hive Metastore client
+	 * @param databaseName the name of the database
+	 * @return A list of names of the views
+	 * @throws UnknownDBException if the database doesn't exist
+	 * @throws TException         for any other generic exceptions caused by Thrift
+	 */
+	List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException;
+
+	/**
+	 * Gets a function from a database with the given HMS client.
+	 *
+	 * @param client       the Hive Metastore client
+	 * @param dbName       name of the database
+	 * @param functionName name of the function
+	 * @return the Function under the specified name
+	 * @throws NoSuchObjectException if the function doesn't exist
+	 * @throws TException            for any other generic exceptions caused by Thrift
+	 */
+	Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException;
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java
new file mode 100644
index 0000000..ef90547
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A loader to load HiveShim.
+ */
+public class HiveShimLoader {
+
+	private static final String HIVE_V1_VERSION_NAME = "1.2.1";
+	private static final String HIVE_V2_VERSION_NAME = "2.3.4";
+
+	private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2);
+
+	private static final Logger LOG = LoggerFactory.getLogger(HiveShimLoader.class);
+
+	private HiveShimLoader() {
+	}
+
+	public static HiveShim loadHiveShim() {
+		String version = HiveVersionInfo.getVersion();
+		return hiveShims.computeIfAbsent(version, (v) -> {
+			if (v.startsWith(HIVE_V1_VERSION_NAME)) {
+				return new HiveShimV1();
+			}
+			if (v.startsWith(HIVE_V2_VERSION_NAME)) {
+				return new HiveShimV2();
+			}
+			throw new CatalogException("Unsupported Hive version " + v);
+		});
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java
new file mode 100644
index 0000000..1048f52
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV1.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shim for Hive version 1.x.
+ */
+public class HiveShimV1 implements HiveShim {
+
+	@Override
+	public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+		try {
+			Method method = RetryingMetaStoreClient.class.getMethod("getProxy", HiveConf.class);
+			// getProxy is a static method
+			return (IMetaStoreClient) method.invoke(null, (hiveConf));
+		} catch (Exception ex) {
+			throw new CatalogException("Failed to create Hive Metastore client", ex);
+		}
+	}
+
+	@Override
+	// 1.x client doesn't support filtering tables by type, so here we need to get all tables and filter by ourselves
+	public List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
+		// We don't have to use reflection here because client.getAllTables(String) is supposed to be there for
+		// all versions.
+		List<String> tableNames = client.getAllTables(databaseName);
+		List<String> views = new ArrayList<>();
+		for (String name : tableNames) {
+			Table table = client.getTable(databaseName, name);
+			String viewDef = table.getViewOriginalText();
+			if (viewDef != null && !viewDef.isEmpty()) {
+				views.add(table.getTableName());
+			}
+		}
+		return views;
+	}
+
+	@Override
+	public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
+		try {
+			// hive-1.x doesn't throw NoSuchObjectException if function doesn't exist, instead it throws a MetaException
+			return client.getFunction(dbName, functionName);
+		} catch (MetaException e) {
+			if (e.getCause() instanceof NoSuchObjectException) {
+				throw (NoSuchObjectException) e.getCause();
+			}
+			throw e;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java
new file mode 100644
index 0000000..fefb48f
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveShimV2.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Shim for Hive version 2.x.
+ */
+public class HiveShimV2 implements HiveShim {
+
+	@Override
+	public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+		try {
+			Method method = RetryingMetaStoreClient.class.getMethod("getProxy", HiveConf.class, Boolean.TYPE);
+			// getProxy is a static method
+			return (IMetaStoreClient) method.invoke(null, hiveConf, true);
+		} catch (Exception ex) {
+			throw new CatalogException("Failed to create Hive Metastore client", ex);
+		}
+	}
+
+	@Override
+	public List<String> getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
+		try {
+			Method method = client.getClass().getMethod("getTables", String.class, String.class, TableType.class);
+			return (List<String>) method.invoke(client, databaseName, null, TableType.VIRTUAL_VIEW);
+		} catch (InvocationTargetException ite) {
+			Throwable targetEx = ite.getTargetException();
+			if (targetEx instanceof TException) {
+				throw (TException) targetEx;
+			} else {
+				throw new CatalogException(String.format("Failed to get views for %s", databaseName), targetEx);
+			}
+		} catch (NoSuchMethodException | IllegalAccessException e) {
+			throw new CatalogException(String.format("Failed to get views for %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
+		return client.getFunction(dbName, functionName);
+	}
+}