You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 21:53:27 UTC

[flink] branch master updated: [FLINK-12232][hive] Support database related operations in HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cf3f1  [FLINK-12232][hive] Support database related operations in HiveCatalog
a3cf3f1 is described below

commit a3cf3f1fc69e9ed17433cbc522c7086cead7790e
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu May 2 21:05:04 2019 -0700

    [FLINK-12232][hive] Support database related operations in HiveCatalog
    
    This PR creates HiveCatalog in flink-connector-hive module and implements database related APIs for HiveCatalog.
    
    This closes #8837
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 162 ++--------------
 .../flink/table/catalog/hive/HiveCatalog.java      | 210 +++++++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 200 ++++++++++++++++++++
 .../table/catalog/hive/HiveCatalogDatabase.java    |  51 +++--
 .../flink/table/catalog/hive/HiveCatalogUtil.java  |  45 ++---
 .../hive/GenericHiveMetastoreCatalogTest.java      |   2 +-
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 179 ++++++++++++++++++
 .../table/catalog/GenericCatalogDatabase.java      |  20 +-
 .../flink/table/catalog/CatalogDatabase.java       |   7 +
 .../flink/table/catalog/CatalogTestBase.java       |   5 +
 10 files changed, 679 insertions(+), 202 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 50ed2e9..bb431cc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -25,10 +25,8 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ReadableWritableCatalog;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -38,16 +36,10 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
@@ -57,173 +49,43 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage.
  */
-public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 	private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
 
-	public static final String DEFAULT_DB = "default";
-
-	private final String catalogName;
-	private final HiveConf hiveConf;
-
-	private String currentDatabase = DEFAULT_DB;
-	private IMetaStoreClient client;
-
 	public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) {
-		this(catalogName, getHiveConf(hivemetastoreURI));
-	}
+		super(catalogName, hivemetastoreURI);
 
-	public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
-		this.catalogName = catalogName;
-
-		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 		LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
 	}
 
-	private static HiveConf getHiveConf(String hiveMetastoreURI) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
-
-		HiveConf hiveConf = new HiveConf();
-		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
-		return hiveConf;
-	}
-
-	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
-		try {
-			return RetryingMetaStoreClient.getProxy(
-				hiveConf,
-				null,
-				null,
-				HiveMetaStoreClient.class.getName(),
-				true);
-		} catch (MetaException e) {
-			throw new CatalogException("Failed to create Hive metastore client", e);
-		}
-	}
-
-	@Override
-	public void open() throws CatalogException {
-		if (client == null) {
-			client = getMetastoreClient(hiveConf);
-			LOG.info("Connected to Hive metastore");
-		}
-	}
+	public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
+		super(catalogName, hiveConf);
 
-	@Override
-	public void close() throws CatalogException {
-		if (client != null) {
-			client.close();
-			client = null;
-			LOG.info("Close connection to Hive metastore");
-		}
+		LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
 	}
 
 	// ------ databases ------
 
 	@Override
-	public String getCurrentDatabase() throws CatalogException {
-		return currentDatabase;
-	}
-
-	@Override
-	public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
-		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		}
-
-		currentDatabase = databaseName;
-	}
-
-	@Override
-	public List<String> listDatabases() throws CatalogException {
-		try {
-			return client.getAllDatabases();
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to list all databases in %s", catalogName), e);
-		}
-	}
-
-	@Override
 	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
-		Database hiveDb;
-
-		try {
-			hiveDb = client.getDatabase(databaseName);
-		} catch (NoSuchObjectException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
-		}
+		Database hiveDb = getHiveDatabase(databaseName);
 
 		return new GenericCatalogDatabase(hiveDb.getParameters(), hiveDb.getDescription());
 	}
 
 	@Override
-	public boolean databaseExists(String databaseName) throws CatalogException {
-		try {
-			return client.getDatabase(databaseName) != null;
-		} catch (NoSuchObjectException e) {
-			return false;
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to determine whether database %s exists or not", databaseName), e);
-		}
+	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
 	}
 
 	@Override
-	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
-
-		try {
-			client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database));
-		} catch (AlreadyExistsException e) {
-			if (!ignoreIfExists) {
-				throw new DatabaseAlreadyExistException(catalogName, name);
-			}
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to create database %s", name), e);
-		}
-	}
-
-	@Override
-	public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
-		try {
-			client.dropDatabase(name, true, ignoreIfNotExists);
-		} catch (NoSuchObjectException e) {
-			if (!ignoreIfNotExists) {
-				throw new DatabaseNotExistException(catalogName, name);
-			}
-		} catch (InvalidOperationException e) {
-			if (e.getMessage().startsWith(String.format("Database %s is not empty", name))) {
-				throw new DatabaseNotEmptyException(catalogName, name);
-			} else {
-				throw new CatalogException(String.format("Failed to drop database %s", name), e);
-			}
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to drop database %s", name), e);
-		}
-	}
-
-	@Override
-	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
-		try {
-			if (databaseExists(name)) {
-				client.alterDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase));
-			} else if (!ignoreIfNotExists) {
-				throw new DatabaseNotExistException(catalogName, name);
-			}
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to alter database %s", name), e);
-		}
+	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+			throws DatabaseNotExistException, CatalogException {
+		alterHiveDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
 	}
 
 	// ------ tables and views------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
new file mode 100644
index 0000000..7f96852
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+
+	public HiveCatalog(String catalogName, String hivemetastoreURI) {
+		super(catalogName, hivemetastoreURI);
+
+		LOG.info("Created HiveCatalog '{}'", catalogName);
+	}
+
+	public HiveCatalog(String catalogName, HiveConf hiveConf) {
+		super(catalogName, hiveConf);
+
+		LOG.info("Created HiveCatalog '{}'", catalogName);
+	}
+
+	// ------ databases ------
+
+	@Override
+	public CatalogDatabase getDatabase(String databaseName)
+			throws DatabaseNotExistException, CatalogException {
+		Database hiveDb = getHiveDatabase(databaseName);
+
+		return new HiveCatalogDatabase(
+			hiveDb.getParameters(), hiveDb.getLocationUri(), hiveDb.getDescription());
+	}
+
+	@Override
+	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		createHiveDatabase(HiveCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
+	}
+
+	@Override
+	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+		alterHiveDatabase(name, HiveCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
+	}
+
+	// ------ tables and views------
+
+	@Override
+	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+			throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+			throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<String> listTables(String databaseName)
+			throws DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ partitions ------
+
+	@Override
+	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+			throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+			throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws TableNotExistException, TableNotPartitionedException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+			throws PartitionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------ functions ------
+
+	@Override
+	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+			throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+			throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+			throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
new file mode 100644
index 0000000..b044de3
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for catalogs backed by Hive metastore.
+ */
+public abstract class HiveCatalogBase implements ReadableWritableCatalog {
+	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class);
+
+	public static final String DEFAULT_DB = "default";
+
+	protected final String catalogName;
+	protected final HiveConf hiveConf;
+
+	protected String currentDatabase = DEFAULT_DB;
+	protected IMetaStoreClient client;
+
+	public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
+		this(catalogName, getHiveConf(hivemetastoreURI));
+	}
+
+	public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+		this.catalogName = catalogName;
+
+		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
+	}
+
+	private static HiveConf getHiveConf(String hiveMetastoreURI) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
+
+		HiveConf hiveConf = new HiveConf();
+		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+		return hiveConf;
+	}
+
+	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+		try {
+			return RetryingMetaStoreClient.getProxy(
+				hiveConf,
+				null,
+				null,
+				HiveMetaStoreClient.class.getName(),
+				true);
+		} catch (MetaException e) {
+			throw new CatalogException("Failed to create Hive metastore client", e);
+		}
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		if (client == null) {
+			client = getMetastoreClient(hiveConf);
+			LOG.info("Connected to Hive metastore");
+		}
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		if (client != null) {
+			client.close();
+			client = null;
+			LOG.info("Close connection to Hive metastore");
+		}
+	}
+
+	// ------ databases ------
+
+	@Override
+	public String getCurrentDatabase() throws CatalogException {
+		return currentDatabase;
+	}
+
+	@Override
+	public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+		if (!databaseExists(databaseName)) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		}
+
+		currentDatabase = databaseName;
+	}
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		try {
+			return client.getAllDatabases();
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list all databases in %s", catalogName), e);
+		}
+	}
+
+	@Override
+	public boolean databaseExists(String databaseName) throws CatalogException {
+		try {
+			return client.getDatabase(databaseName) != null;
+		} catch (NoSuchObjectException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to determine whether database %s exists or not", databaseName), e);
+		}
+	}
+
+	@Override
+	public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+		try {
+			client.dropDatabase(name, true, ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, name);
+			}
+		} catch (InvalidOperationException e) {
+			throw new DatabaseNotEmptyException(catalogName, name);
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to drop database %s", name), e);
+		}
+	}
+
+	protected Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+		try {
+			return client.getDatabase(databaseName);
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+		}
+	}
+
+	protected void createHiveDatabase(Database hiveDatabase, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		try {
+			client.createDatabase(hiveDatabase);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
+		}
+	}
+
+	protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean ignoreIfNotExists)
+			throws DatabaseNotExistException, CatalogException {
+		try {
+			if (databaseExists(name)) {
+				client.alterDatabase(name, newHiveDatabase);
+			} else if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, name);
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to alter database %s", name), e);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
similarity index 55%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
index 959247a..a7cf5c2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
@@ -16,38 +16,62 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog database implementation.
+ * A hive catalog database implementation.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
+public class HiveCatalogDatabase implements CatalogDatabase {
+	// Property of the database
 	private final Map<String, String> properties;
+	// HDFS path of the database
+	private String location;
 	// Comment of the database
-	private String comment = "This is a generic catalog database.";
+	private String comment = "This is a hive catalog database.";
+
+	public HiveCatalogDatabase() {
+		properties = new HashMap<>();
+	}
 
-	public GenericCatalogDatabase(Map<String, String> properties) {
+	public HiveCatalogDatabase(Map<String, String> properties) {
 		this.properties = checkNotNull(properties, "properties cannot be null");
 	}
 
-	public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+	public HiveCatalogDatabase(Map<String, String> properties, String comment) {
 		this(properties);
 		this.comment = checkNotNull(comment, "comment cannot be null");
 	}
 
+	public HiveCatalogDatabase(Map<String, String> properties, String location, String comment) {
+		this(properties, comment);
+
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(location), "location cannot be null or empty");
+		this.location = location;
+	}
+
+	@Override
 	public Map<String, String> getProperties() {
 		return properties;
 	}
 
 	@Override
-	public GenericCatalogDatabase copy() {
-		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+	public String getComment() {
+		return comment;
+	}
+
+	@Override
+	public HiveCatalogDatabase copy() {
+		return new HiveCatalogDatabase(new HashMap<>(properties), location, comment);
 	}
 
 	@Override
@@ -57,15 +81,10 @@ public class GenericCatalogDatabase implements CatalogDatabase {
 
 	@Override
 	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog database stored in memory only");
+		return Optional.of("This is a Hive catalog database stored in memory only");
 	}
 
-	public String getComment() {
-		return this.comment;
+	public String getLocation() {
+		return location;
 	}
-
-	public void setComment(String comment) {
-		this.comment = comment;
-	}
-
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
similarity index 53%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
index 89233da..e972ba4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
@@ -16,38 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
 
-import java.util.Map;
-import java.util.Optional;
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
 
 /**
- * Represents a database object in a catalog.
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
  */
-public interface CatalogDatabase {
-	/**
-	 * Get a map of properties associated with the database.
-	 */
-	Map<String, String> getProperties();
+public class HiveCatalogUtil {
 
-	/**
-	 * Get a deep copy of the CatalogDatabase instance.
-	 *
-	 * @return a copy of CatalogDatabase instance
-	 */
-	CatalogDatabase copy();
+	private HiveCatalogUtil() {
+	}
 
-	/**
-	 * Get a brief description of the database.
-	 *
-	 * @return an optional short description of the database
-	 */
-	Optional<String> getDescription();
+	// ------ Utils ------
 
 	/**
-	 * Get a detailed description of the database.
-	 *
-	 * @return an optional long description of the database
+	 * Creates a Hive database from CatalogDatabase.
 	 */
-	Optional<String> getDetailedDescription();
+	public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
+		HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db;
+
+		return new Database(
+			dbName,
+			db.getComment(),
+			hiveCatalogDatabase.getLocation(),
+			hiveCatalogDatabase.getProperties());
+
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 2687699..76f8e08 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -95,7 +95,7 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 
 	@Override
 	public String getBuiltInDefaultDatabase() {
-		return GenericHiveMetastoreCatalog.DEFAULT_DB;
+		return HiveCatalogBase.DEFAULT_DB;
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
new file mode 100644
index 0000000..7b132c9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for HiveCatalog.
+ */
+public class HiveCatalogTest extends CatalogTestBase {
+	@BeforeClass
+	public static void init() throws IOException {
+		catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+		catalog.open();
+	}
+
+	// =====================
+	// HiveCatalog doesn't support table operation yet
+	// Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against HiveCatalog
+	// =====================
+
+	// TODO: re-enable these tests once HiveCatalog support table operations
+	@Test
+	public void testDropDb_DatabaseNotEmptyException() throws Exception {
+	}
+
+	@Test
+	public void testCreateTable_Streaming() throws Exception {
+	}
+
+	@Test
+	public void testCreateTable_Batch() throws Exception {
+	}
+
+	@Test
+	public void testCreateTable_DatabaseNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testCreateTable_TableAlreadyExistException() throws Exception {
+	}
+
+	@Test
+	public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+	}
+
+	@Test
+	public void testGetTable_TableNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testGetTable_TableNotExistException_NoDb() throws Exception {
+	}
+
+	@Test
+	public void testDropTable_nonPartitionedTable() throws Exception {
+	}
+
+	@Test
+	public void testDropTable_TableNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testDropTable_TableNotExist_ignored() throws Exception {
+	}
+
+	@Test
+	public void testAlterTable() throws Exception {
+	}
+
+	@Test
+	public void testAlterTable_TableNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testAlterTable_TableNotExist_ignored() throws Exception {
+	}
+
+	@Test
+	public void testRenameTable_nonPartitionedTable() throws Exception {
+	}
+
+	@Test
+	public void testRenameTable_TableNotExistException() throws Exception {
+	}
+
+	@Test
+	public void testRenameTable_TableNotExistException_ignored() throws Exception {
+	}
+
+	@Test
+	public void testRenameTable_TableAlreadyExistException() throws Exception {
+	}
+
+	@Test
+	public void testTableExists() throws Exception {
+	}
+
+	// ------ utils ------
+
+	@Override
+	public String getBuiltInDefaultDatabase() {
+		return HiveCatalogBase.DEFAULT_DB;
+	}
+
+	@Override
+	public CatalogDatabase createDb() {
+		return new HiveCatalogDatabase(
+			new HashMap<String, String>() {{
+				put("k1", "v1");
+			}},
+			TEST_COMMENT
+		);
+	}
+
+	@Override
+	public CatalogDatabase createAnotherDb() {
+		return new HiveCatalogDatabase(
+			new HashMap<String, String>() {{
+				put("k2", "v2");
+			}},
+			TEST_COMMENT
+		);
+	}
+
+	@Override
+	public CatalogTable createTable() {
+		// TODO: implement this once HiveCatalog support table operations
+		return null;
+	}
+
+	@Override
+	public CatalogTable createAnotherTable() {
+		// TODO: implement this once HiveCatalog support table operations
+		return null;
+	}
+
+	@Override
+	public CatalogTable createStreamingTable() {
+		// TODO: implement this once HiveCatalog support table operations
+		return null;
+	}
+
+	@Override
+	public CatalogTable createPartitionedTable() {
+		// TODO: implement this once HiveCatalog support table operations
+		return null;
+	}
+
+	@Override
+	public CatalogTable createAnotherPartitionedTable() {
+		// TODO: implement this once HiveCatalog support table operations
+		return null;
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index 959247a..d5df274 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -29,9 +29,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class GenericCatalogDatabase implements CatalogDatabase {
 	private final Map<String, String> properties;
-	// Comment of the database
 	private String comment = "This is a generic catalog database.";
 
+	public GenericCatalogDatabase() {
+		this.properties = new HashMap<>();
+	}
+
 	public GenericCatalogDatabase(Map<String, String> properties) {
 		this.properties = checkNotNull(properties, "properties cannot be null");
 	}
@@ -41,11 +44,17 @@ public class GenericCatalogDatabase implements CatalogDatabase {
 		this.comment = checkNotNull(comment, "comment cannot be null");
 	}
 
+	@Override
 	public Map<String, String> getProperties() {
 		return properties;
 	}
 
 	@Override
+	public String getComment() {
+		return this.comment;
+	}
+
+	@Override
 	public GenericCatalogDatabase copy() {
 		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
 	}
@@ -59,13 +68,4 @@ public class GenericCatalogDatabase implements CatalogDatabase {
 	public Optional<String> getDetailedDescription() {
 		return Optional.of("This is a generic catalog database stored in memory only");
 	}
-
-	public String getComment() {
-		return this.comment;
-	}
-
-	public void setComment(String comment) {
-		this.comment = comment;
-	}
-
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
index 89233da..8b14ff8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
@@ -31,6 +31,13 @@ public interface CatalogDatabase {
 	Map<String, String> getProperties();
 
 	/**
+	 * Get comment of the database.
+	 *
+	 * @return comment of the database
+	 */
+	String getComment();
+
+	/**
 	 * Get a deep copy of the CatalogDatabase instance.
 	 *
 	 * @return a copy of CatalogDatabase instance
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 8117c82..470a9a9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -109,11 +109,16 @@ public abstract class CatalogTestBase {
 	@Test
 	public void testSetCurrentDatabase() throws Exception {
 		assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
+
 		catalog.createDatabase(db2, createDb(), true);
 		catalog.setCurrentDatabase(db2);
+
 		assertEquals(db2, catalog.getCurrentDatabase());
+
 		catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
+
 		assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
+
 		catalog.dropDatabase(db2, false);
 	}