You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/21 02:29:05 UTC

[flink] branch master updated: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fba21a  [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCatalog
3fba21a is described below

commit 3fba21a0ce5534b8c95a2bb9e8547b0061e5bb8f
Author: Xuefu Zhang <xu...@alibaba-inc.com>
AuthorDate: Sun May 19 08:26:09 2019 -0700

    [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCatalog
    
    This PR is to combine GenericHiveMetastoreCatalog and HiveCatalog into one single class.
    
    This closes #8480.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 357 --------------
 .../flink/table/catalog/hive/HiveCatalog.java      | 519 ++++++++++++++++++---
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 432 -----------------
 .../table/catalog/hive/HiveCatalogDatabase.java    |  33 +-
 .../flink/table/catalog/hive/HiveCatalogTable.java |  58 +--
 .../flink/table/catalog/hive/HiveCatalogView.java  |  64 +--
 .../hive/GenericHiveMetastoreCatalogTest.java      |   2 +-
 .../flink/table/catalog/hive/HiveTestUtils.java    |   7 -
 ...gDatabase.java => AbstractCatalogDatabase.java} |  30 +-
 ...gDatabase.java => AbstractCatalogFunction.java} |  41 +-
 ...Database.java => AbstractCatalogPartition.java} |  29 +-
 ...CatalogTable.java => AbstractCatalogTable.java} |  40 +-
 ...icCatalogView.java => AbstractCatalogView.java} |  47 +-
 .../table/catalog/GenericCatalogDatabase.java      |  31 +-
 .../table/catalog/GenericCatalogFunction.java      |  39 +-
 .../table/catalog/GenericCatalogPartition.java     |  23 +-
 .../flink/table/catalog/GenericCatalogTable.java   |  62 +--
 .../flink/table/catalog/GenericCatalogView.java    |  71 +--
 .../table/catalog/GenericInMemoryCatalog.java      |   5 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  |  10 +-
 .../apache/flink/table/catalog/CatalogTable.java   |   9 +
 21 files changed, 606 insertions(+), 1303 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
deleted file mode 100644
index b906e5c..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.GenericCatalogDatabase;
-import org.apache.flink.table.catalog.GenericCatalogTable;
-import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage.
- */
-public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
-	private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
-
-	// Prefix used to distinguish properties created by Hive and Flink,
-	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
-	private static final String FLINK_PROPERTY_PREFIX = "flink.";
-
-	public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) {
-		super(catalogName, hivemetastoreURI);
-
-		LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
-	}
-
-	public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
-		super(catalogName, hiveConf);
-	}
-
-	public GenericHiveMetastoreCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
-		super(catalogName, defaultDatabase, hiveConf);
-
-		LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
-	}
-
-	// ------ databases ------
-
-	@Override
-	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-		return new GenericCatalogDatabase(
-			retrieveFlinkProperties(hiveDatabase.getParameters()),
-			hiveDatabase.getDescription()
-		);
-	}
-
-	@Override
-	protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
-		return new Database(
-			databaseName,
-			catalogDatabase.getComment(),
-			// HDFS location URI which GenericCatalogDatabase shouldn't care
-			null,
-			maskFlinkProperties(catalogDatabase.getProperties()));
-	}
-
-	// ------ tables and views------
-
-	@Override
-	protected void validateCatalogBaseTable(CatalogBaseTable table)
-			throws CatalogException {
-		if (!(table instanceof GenericCatalogTable) && !(table instanceof GenericCatalogView)) {
-			throw new CatalogException(
-				"GenericHiveMetastoreCatalog can only operate on GenericCatalogTable and GenericCatalogView.");
-		}
-	}
-
-	@Override
-	protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
-		// Table schema
-		TableSchema tableSchema = HiveTableUtil.createTableSchema(
-			hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
-
-		// Table properties
-		Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters());
-
-		// Table comment
-		String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
-
-		// Partition keys
-		List<String> partitionKeys = new ArrayList<>();
-
-		if (!hiveTable.getPartitionKeys().isEmpty()) {
-			partitionKeys = hiveTable.getPartitionKeys().stream()
-				.map(fs -> fs.getName())
-				.collect(Collectors.toList());
-		}
-
-		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
-			return new GenericCatalogView(
-				hiveTable.getViewOriginalText(),
-				hiveTable.getViewExpandedText(),
-				tableSchema,
-				properties,
-				comment
-			);
-		} else {
-			return new GenericCatalogTable(
-				tableSchema, partitionKeys, properties, comment);
-		}
-	}
-
-	@Override
-	protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
-		Map<String, String> properties = new HashMap<>(table.getProperties());
-
-		// Table comment
-		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
-
-		Table hiveTable = new Table();
-		hiveTable.setDbName(tablePath.getDatabaseName());
-		hiveTable.setTableName(tablePath.getObjectName());
-		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
-
-		// Table properties
-		hiveTable.setParameters(maskFlinkProperties(properties));
-
-		// Hive table's StorageDescriptor
-		StorageDescriptor sd = new StorageDescriptor();
-		sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
-
-		List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
-
-		// Table columns and partition keys
-		if (table instanceof CatalogTable) {
-			CatalogTable catalogTable = (CatalogTable) table;
-
-			if (catalogTable.isPartitioned()) {
-				int partitionKeySize = catalogTable.getPartitionKeys().size();
-				List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
-				List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
-				sd.setCols(regularColumns);
-				hiveTable.setPartitionKeys(partitionColumns);
-			} else {
-				sd.setCols(allColumns);
-				hiveTable.setPartitionKeys(new ArrayList<>());
-			}
-		} else if (table instanceof CatalogView) {
-			CatalogView view = (CatalogView) table;
-
-			// TODO: [FLINK-12398] Support partitioned view in catalog API
-			sd.setCols(allColumns);
-			hiveTable.setPartitionKeys(new ArrayList<>());
-
-			hiveTable.setViewOriginalText(view.getOriginalQuery());
-			hiveTable.setViewExpandedText(view.getExpandedQuery());
-			hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
-		} else {
-			throw new CatalogException(
-				"GenericHiveMetastoreCatalog only supports CatalogTable and CatalogView");
-		}
-
-		hiveTable.setSd(sd);
-
-		return hiveTable;
-	}
-
-	// ------ partitions ------
-
-	@Override
-	public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-			throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
-			throws TableNotExistException, TableNotPartitionedException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws TableNotExistException, TableNotPartitionedException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	// ------ functions ------
-
-	@Override
-	public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
-			throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
-			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
-			throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean functionExists(ObjectPath functionPath) throws CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	// ------ statistics ------
-
-	@Override
-	public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
-			throws TableNotExistException, CatalogException {
-
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
-			throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
-			boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
-			boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-			throws PartitionNotExistException, CatalogException {
-		throw new UnsupportedOperationException();
-	}
-
-	// ------ utils ------
-
-	/**
-	 * Filter out Hive-created properties, and return Flink-created properties.
-	 */
-	private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
-		return hiveTableParams.entrySet().stream()
-			.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
-			.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
-	}
-
-	/**
-	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
-	 */
-	private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
-		return properties.entrySet().stream()
-			.filter(e -> e.getKey() != null && e.getValue() != null)
-			.collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
-	}
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 66826eb..13c5fde 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,32 +19,54 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogTable;
+import org.apache.flink.table.catalog.AbstractCatalogView;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,117 +76,486 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A catalog implementation for Hive.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+	private static final String DEFAULT_DB = "default";
 
-	public HiveCatalog(String catalogName, String hivemetastoreURI) {
-		super(catalogName, hivemetastoreURI);
+	// Prefix used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+	private static final String FLINK_PROPERTY_PREFIX = "flink.";
+	private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY;
 
-		LOG.info("Created HiveCatalog '{}'", catalogName);
+	protected final String catalogName;
+	protected final HiveConf hiveConf;
+
+	private final String defaultDatabase;
+	protected IMetaStoreClient client;
+
+	public HiveCatalog(String catalogName, String hivemetastoreURI) {
+		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
 	}
 
 	public HiveCatalog(String catalogName, HiveConf hiveConf) {
-		super(catalogName, hiveConf);
+		this(catalogName, DEFAULT_DB, hiveConf);
+	}
+
+	public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+		this.catalogName = catalogName;
+		this.defaultDatabase = defaultDatabase;
+		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
 
+	private static HiveConf getHiveConf(String hiveMetastoreURI) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
+
+		HiveConf hiveConf = new HiveConf();
+		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+		return hiveConf;
+	}
+
+	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+		try {
+			return RetryingMetaStoreClient.getProxy(
+				hiveConf,
+				null,
+				null,
+				HiveMetaStoreClient.class.getName(),
+				true);
+		} catch (MetaException e) {
+			throw new CatalogException("Failed to create Hive metastore client", e);
+		}
+	}
+
+	@Override
+	public void open() throws CatalogException {
+		if (client == null) {
+			client = getMetastoreClient(hiveConf);
+			LOG.info("Connected to Hive metastore");
+		}
+
+		if (!databaseExists(defaultDatabase)) {
+			throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
+				defaultDatabase, catalogName));
+		}
+	}
+
+	@Override
+	public void close() throws CatalogException {
+		if (client != null) {
+			client.close();
+			client = null;
+			LOG.info("Close connection to Hive metastore");
+		}
+	}
+
 	// ------ databases ------
 
+	public String getDefaultDatabase() throws CatalogException {
+		return defaultDatabase;
+	}
+
 	@Override
-	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-		return new HiveCatalogDatabase(
-			hiveDatabase.getParameters(),
-			hiveDatabase.getLocationUri(),
-			hiveDatabase.getDescription());
+	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+		Database hiveDatabase = getHiveDatabase(databaseName);
+
+		Map<String, String> properties = hiveDatabase.getParameters();
+		boolean isGeneric = Boolean.valueOf(properties.get(FLINK_PROPERTY_IS_GENERIC));
+		return !isGeneric ? new HiveCatalogDatabase(properties, hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+			new GenericCatalogDatabase(retrieveFlinkProperties(properties), hiveDatabase.getDescription());
 	}
 
 	@Override
-	protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
-		HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) catalogDatabase;
+	public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
+			throws DatabaseAlreadyExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+		checkNotNull(database, "database cannot be null");
+
+		Database hiveDatabase;
+		if (database instanceof HiveCatalogDatabase) {
+			hiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) database);
+		} else if (database instanceof GenericCatalogDatabase) {
+			hiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) database);
+		} else {
+			throw new CatalogException(String.format("Unsupported catalog database type %s", database.getClass()), null);
+		}
 
-		return new Database(
-			databaseName,
-			catalogDatabase.getComment(),
-			hiveCatalogDatabase.getLocation(),
-			hiveCatalogDatabase.getProperties());
+		try {
+			client.createDatabase(hiveDatabase);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
+		}
 	}
 
-	// ------ tables and views------
+	private static Database instantiateHiveDatabase(String databaseName, HiveCatalogDatabase database) {
+		return new Database(databaseName,
+			database.getComment(),
+			database.getLocation(),
+			database.getProperties());
+	}
+
+	private static Database instantiateHiveDatabase(String databaseName, GenericCatalogDatabase database) {
+		Map<String, String> properties = database.getProperties();
+
+		return new Database(databaseName,
+			database.getComment(),
+			// HDFS location URI which GenericCatalogDatabase shouldn't care
+			null,
+			maskFlinkProperties(properties));
+	}
 
 	@Override
-	protected void validateCatalogBaseTable(CatalogBaseTable table)
-			throws CatalogException {
-		if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView)) {
+	public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+			throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+		checkNotNull(newDatabase, "newDatabase cannot be null");
+
+		Database newHiveDatabase;
+		if (newDatabase instanceof HiveCatalogDatabase) {
+			newHiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) newDatabase);
+		} else if (newDatabase instanceof GenericCatalogDatabase) {
+			newHiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) newDatabase);
+		} else {
+			throw new CatalogException(String.format("Unsupported catalog database type %s", newDatabase.getClass()), null);
+		}
+
+		try {
+			if (databaseExists(databaseName)) {
+				client.alterDatabase(databaseName, newHiveDatabase);
+			} else if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, databaseName);
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to alter database %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public List<String> listDatabases() throws CatalogException {
+		try {
+			return client.getAllDatabases();
+		} catch (TException e) {
 			throw new CatalogException(
-				"HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
+				String.format("Failed to list all databases in %s", catalogName), e);
 		}
 	}
 
 	@Override
-	protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
-		// Table schema
-		TableSchema tableSchema =
-			HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+	public boolean databaseExists(String databaseName) throws CatalogException {
+		try {
+			return client.getDatabase(databaseName) != null;
+		} catch (NoSuchObjectException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to determine whether database %s exists or not", databaseName), e);
+		}
+	}
+
+	@Override
+	public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException,
+			DatabaseNotEmptyException, CatalogException {
+		try {
+			client.dropDatabase(name, true, ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new DatabaseNotExistException(catalogName, name);
+			}
+		} catch (InvalidOperationException e) {
+			throw new DatabaseNotEmptyException(catalogName, name);
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to drop database %s", name), e);
+		}
+	}
+
+	private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+		try {
+			return client.getDatabase(databaseName);
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+		}
+	}
+
+	// ------ tables ------
+
+	@Override
+	public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		Table hiveTable = getHiveTable(tablePath);
+		return instantiateHiveCatalogTable(hiveTable);
+	}
+
+	@Override
+	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+			throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkNotNull(table, "table cannot be null");
+
+		if (!databaseExists(tablePath.getDatabaseName())) {
+			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+		}
+
+		Table hiveTable = instantiateHiveTable(tablePath, table);
+
+		try {
+			client.createTable(hiveTable);
+		} catch (AlreadyExistsException e) {
+			if (!ignoreIfExists) {
+				throw new TableAlreadyExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+			throws TableNotExistException, TableAlreadyExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName cannot be null or empty");
+
+		try {
+			// alter_table() doesn't throw a clear exception when target table doesn't exist.
+			// Thus, check the table existence explicitly
+			if (tableExists(tablePath)) {
+				ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+				// alter_table() doesn't throw a clear exception when new table already exists.
+				// Thus, check the table existence explicitly
+				if (tableExists(newPath)) {
+					throw new TableAlreadyExistException(catalogName, newPath);
+				} else {
+					Table table = getHiveTable(tablePath);
+					table.setTableName(newTableName);
+					client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+				}
+			} else if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to rename table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
+			throws TableNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+		checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
+
+		if (!tableExists(tablePath)) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+			return;
+		}
+
+		Table oldTable = getHiveTable(tablePath);
+		TableType oldTableType = TableType.valueOf(oldTable.getTableType());
+
+		if (oldTableType == TableType.VIRTUAL_VIEW) {
+			if (!(newCatalogTable instanceof CatalogView)) {
+				throw new CatalogException(
+					String.format("Table types don't match. The existing table is a view, but the new catalog base table is not."));
+			}
+			// Else, do nothing
+		} else if ((oldTableType == TableType.MANAGED_TABLE)) {
+			if (!(newCatalogTable instanceof CatalogTable)) {
+				throw new CatalogException(
+					String.format("Table types don't match. The existing table is a table, but the new catalog base table is not."));
+			}
+			// Else, do nothing
+		} else {
+			throw new CatalogException(
+				String.format("Hive table type '%s' is not supported yet.",
+					oldTableType.name()));
+		}
+
+		Table newTable = instantiateHiveTable(tablePath, newCatalogTable);
+
+		// client.alter_table() requires a valid location
+		// thus, if new table doesn't have that, it reuses location of the old table
+		if (!newTable.getSd().isSetLocation()) {
+			newTable.getSd().setLocation(oldTable.getSd().getLocation());
+		}
+
+		try {
+			client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
+		} catch (TException e) {
+			throw new CatalogException(String.format("Failed to rename table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		try {
+			client.dropTable(
+				tablePath.getDatabaseName(),
+				tablePath.getObjectName(),
+				// Indicate whether associated data should be deleted.
+				// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
+				true,
+				ignoreIfNotExists);
+		} catch (NoSuchObjectException e) {
+			if (!ignoreIfNotExists) {
+				throw new TableNotExistException(catalogName, tablePath);
+			}
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to drop table %s", tablePath.getFullName()), e);
+		}
+	}
+
+	@Override
+	public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+		try {
+			return client.getAllTables(databaseName);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list tables in database %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+		try {
+			return client.getTables(
+				databaseName,
+				null, // table pattern
+				TableType.VIRTUAL_VIEW);
+		} catch (UnknownDBException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to list views in database %s", databaseName), e);
+		}
+	}
+
+	@Override
+	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+		checkNotNull(tablePath, "tablePath cannot be null");
+
+		try {
+			return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (UnknownDBException e) {
+			return false;
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
+		}
+	}
+
+	private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+		try {
+			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+		} catch (NoSuchObjectException e) {
+			throw new TableNotExistException(catalogName, tablePath);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
+		}
+	}
+
+	private static CatalogBaseTable instantiateHiveCatalogTable(Table hiveTable) {
+		boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;
 
 		// Table properties
 		Map<String, String> properties = hiveTable.getParameters();
-
-		// Table comment
+		boolean isGeneric = Boolean.valueOf(properties.get(FLINK_PROPERTY_IS_GENERIC));
+		if (isGeneric) {
+			properties = retrieveFlinkProperties(properties);
+		}
 		String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
 
+		// Table schema
+		TableSchema tableSchema =
+			HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
-
 		if (!hiveTable.getPartitionKeys().isEmpty()) {
-			partitionKeys = hiveTable.getPartitionKeys().stream()
-				.map(fs -> fs.getName())
-				.collect(Collectors.toList());
+			partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList());
 		}
 
-		if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
-			return new HiveCatalogView(
-				hiveTable.getViewOriginalText(),
-				hiveTable.getViewExpandedText(),
-				tableSchema,
-				properties,
-				comment
-			);
+		if (isView) {
+			if (isGeneric) {
+				return new GenericCatalogView(
+					hiveTable.getViewOriginalText(),
+					hiveTable.getViewExpandedText(),
+					tableSchema,
+					properties,
+					comment
+				);
+			} else {
+				return new HiveCatalogView(
+					hiveTable.getViewOriginalText(),
+					hiveTable.getViewExpandedText(),
+					tableSchema,
+					properties,
+					comment
+				);
+			}
 		} else {
-			return new HiveCatalogTable(
-				tableSchema, partitionKeys, properties, comment);
+			if (isGeneric) {
+				return new GenericCatalogTable(tableSchema, partitionKeys, properties, comment);
+			} else {
+				return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
+			}
 		}
 	}
 
-	@Override
-	protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
-		Map<String, String> properties = new HashMap<>(table.getProperties());
-
-		// Table comment
-		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
-
+	private  static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
 		Table hiveTable = new Table();
 		hiveTable.setDbName(tablePath.getDatabaseName());
 		hiveTable.setTableName(tablePath.getObjectName());
 		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
+		Map<String, String> properties = new HashMap<>(table.getProperties());
+		// Table comment
+		properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
+		if (table instanceof GenericCatalogTable || table instanceof GenericCatalogView) {
+			properties = maskFlinkProperties(properties);
+		}
 		// Table properties
 		hiveTable.setParameters(properties);
 
 		// Hive table's StorageDescriptor
 		// TODO: This is very basic Hive table.
-		//  [FLINK-11479] Add input/output format and SerDeLib information for Hive tables in HiveCatalogUtil#createHiveTable
+		//  [FLINK-11479] Add input/output format and SerDeLib information for Hive tables.
 		StorageDescriptor sd = new StorageDescriptor();
+		hiveTable.setSd(sd);
 		sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
 
 		List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
 
 		// Table columns and partition keys
-		if (table instanceof HiveCatalogTable) {
-			HiveCatalogTable catalogTable = (HiveCatalogTable) table;
+		if (table instanceof AbstractCatalogTable) {
+			AbstractCatalogTable catalogTable = (AbstractCatalogTable) table;
 
 			if (catalogTable.isPartitioned()) {
 				int partitionKeySize = catalogTable.getPartitionKeys().size();
@@ -177,8 +568,8 @@ public class HiveCatalog extends HiveCatalogBase {
 				sd.setCols(allColumns);
 				hiveTable.setPartitionKeys(new ArrayList<>());
 			}
-		} else if (table instanceof HiveCatalogView) {
-			HiveCatalogView view = (HiveCatalogView) table;
+		} else if (table instanceof AbstractCatalogView) {
+			AbstractCatalogView view = (AbstractCatalogView) table;
 
 			// TODO: [FLINK-12398] Support partitioned view in catalog API
 			sd.setCols(allColumns);
@@ -192,11 +583,27 @@ public class HiveCatalog extends HiveCatalogBase {
 				"HiveCatalog only supports HiveCatalogTable and HiveCatalogView");
 		}
 
-		hiveTable.setSd(sd);
-
 		return hiveTable;
 	}
 
+	/**
+	 * Filter out Hive-created properties, and return Flink-created properties.
+	 */
+	private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
+		return hiveTableParams.entrySet().stream()
+			.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+			.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+	}
+
+	/**
+	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+	 */
+	private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
+		return properties.entrySet().stream()
+			.filter(e -> e.getKey() != null && e.getValue() != null)
+			.collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
+	}
+
 	// ------ partitions ------
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
deleted file mode 100644
index ea7e221..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for catalogs backed by Hive metastore.
- */
-public abstract class HiveCatalogBase implements Catalog {
-	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class);
-	private static final String DEFAULT_DB = "default";
-
-	protected final String catalogName;
-	protected final HiveConf hiveConf;
-
-	private final String defaultDatabase;
-	protected IMetaStoreClient client;
-
-	public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
-		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
-	}
-
-	public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
-		this(catalogName, DEFAULT_DB, hiveConf);
-	}
-
-	public HiveCatalogBase(String catalogName, String defaultDatabase, HiveConf hiveConf) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
-		this.catalogName = catalogName;
-		this.defaultDatabase = defaultDatabase;
-		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
-	}
-
-	private static HiveConf getHiveConf(String hiveMetastoreURI) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
-
-		HiveConf hiveConf = new HiveConf();
-		hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
-		return hiveConf;
-	}
-
-	private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
-		try {
-			return RetryingMetaStoreClient.getProxy(
-				hiveConf,
-				null,
-				null,
-				HiveMetaStoreClient.class.getName(),
-				true);
-		} catch (MetaException e) {
-			throw new CatalogException("Failed to create Hive metastore client", e);
-		}
-	}
-
-	// ------ APIs ------
-
-	/**
-	 * Validate input base table.
-	 *
-	 * @param catalogBaseTable the base table to be validated
-	 * @throws CatalogException thrown if the input base table is invalid.
-	 */
-	protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable)
-		throws CatalogException;
-
-	/**
-	 * Create a CatalogBaseTable from a Hive table.
-	 *
-	 * @param hiveTable a Hive table
-	 * @return a CatalogBaseTable
-	 */
-	protected abstract CatalogBaseTable createCatalogBaseTable(Table hiveTable);
-
-	/**
-	 * Create a Hive table from a CatalogBaseTable.
-	 *
-	 * @param tablePath path of the table
-	 * @param table a CatalogBaseTable
-	 * @return a Hive table
-	 */
-	protected abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table);
-
-	/**
-	 * Create a CatalogDatabase from a Hive database.
-	 *
-	 * @param hiveDatabase a Hive database
-	 * @return a CatalogDatabase
-	 */
-	protected abstract CatalogDatabase createCatalogDatabase(Database hiveDatabase);
-
-	/**
-	 * Create a Hive database from a CatalogDatabase.
-	 *
-	 * @param databaseName name of the database
-	 * @param catalogDatabase a CatalogDatabase
-	 * @return a Hive database
-	 */
-	protected abstract Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase);
-
-	@Override
-	public void open() throws CatalogException {
-		if (client == null) {
-			client = getMetastoreClient(hiveConf);
-			LOG.info("Connected to Hive metastore");
-		}
-
-		if (!databaseExists(defaultDatabase)) {
-			throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
-				defaultDatabase, catalogName));
-		}
-	}
-
-	@Override
-	public void close() throws CatalogException {
-		if (client != null) {
-			client.close();
-			client = null;
-			LOG.info("Close connection to Hive metastore");
-		}
-	}
-
-	// ------ databases ------
-
-	@Override
-	public String getDefaultDatabase() throws CatalogException {
-		return defaultDatabase;
-	}
-
-	@Override
-	public CatalogDatabase getDatabase(String databaseName)
-			throws DatabaseNotExistException, CatalogException {
-		try {
-			return createCatalogDatabase(client.getDatabase(databaseName));
-		} catch (NoSuchObjectException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
-		}
-	}
-
-	@Override
-	public List<String> listDatabases() throws CatalogException {
-		try {
-			return client.getAllDatabases();
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to list all databases in %s", catalogName), e);
-		}
-	}
-
-	@Override
-	public boolean databaseExists(String databaseName) throws CatalogException {
-		try {
-			return client.getDatabase(databaseName) != null;
-		} catch (NoSuchObjectException e) {
-			return false;
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to determine whether database %s exists or not", databaseName), e);
-		}
-	}
-
-	@Override
-	public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
-		try {
-			client.dropDatabase(name, true, ignoreIfNotExists);
-		} catch (NoSuchObjectException e) {
-			if (!ignoreIfNotExists) {
-				throw new DatabaseNotExistException(catalogName, name, e);
-			}
-		} catch (InvalidOperationException e) {
-			throw new DatabaseNotEmptyException(catalogName, name);
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to drop database %s", name), e);
-		}
-	}
-
-	@Override
-	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
-			throws DatabaseAlreadyExistException, CatalogException {
-		try {
-			client.createDatabase(createHiveDatabase(name, database));
-		} catch (AlreadyExistsException e) {
-			if (!ignoreIfExists) {
-				throw new DatabaseAlreadyExistException(catalogName, name, e);
-			}
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to create database %s", name), e);
-		}
-	}
-
-	@Override
-	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
-			throws DatabaseNotExistException, CatalogException {
-		try {
-			if (databaseExists(name)) {
-				client.alterDatabase(name, createHiveDatabase(name, newDatabase));
-			} else if (!ignoreIfNotExists) {
-				throw new DatabaseNotExistException(catalogName, name);
-			}
-		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to alter database %s", name), e);
-		}
-	}
-
-	// ------ tables ------
-
-	@Override
-	public CatalogBaseTable getTable(ObjectPath tablePath)
-			throws TableNotExistException, CatalogException {
-		return createCatalogBaseTable(getHiveTable(tablePath));
-	}
-
-	@Override
-	public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
-			throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-		validateCatalogBaseTable(table);
-
-		if (!databaseExists(tablePath.getDatabaseName())) {
-			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
-		} else {
-			try {
-				client.createTable(createHiveTable(tablePath, table));
-			} catch (AlreadyExistsException e) {
-				if (!ignoreIfExists) {
-					throw new TableAlreadyExistException(catalogName, tablePath, e);
-				}
-			} catch (TException e) {
-				throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
-			}
-		}
-	}
-
-	@Override
-	public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
-			throws TableNotExistException, TableAlreadyExistException, CatalogException {
-		try {
-			// alter_table() doesn't throw a clear exception when target table doesn't exist.
-			// Thus, check the table existence explicitly
-			if (tableExists(tablePath)) {
-				ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
-				// alter_table() doesn't throw a clear exception when new table already exists.
-				// Thus, check the table existence explicitly
-				if (tableExists(newPath)) {
-					throw new TableAlreadyExistException(catalogName, newPath);
-				} else {
-					Table table = getHiveTable(tablePath);
-					table.setTableName(newTableName);
-					client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
-				}
-			} else if (!ignoreIfNotExists) {
-				throw new TableNotExistException(catalogName, tablePath);
-			}
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to rename table %s", tablePath.getFullName()), e);
-		}
-	}
-
-	@Override
-	public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
-			throws TableNotExistException, CatalogException {
-		validateCatalogBaseTable(newCatalogTable);
-
-		try {
-			if (!tableExists(tablePath)) {
-				if (!ignoreIfNotExists) {
-					throw new TableNotExistException(catalogName, tablePath);
-				}
-				return;
-			}
-
-			Table oldTable = getHiveTable(tablePath);
-			TableType oldTableType = TableType.valueOf(oldTable.getTableType());
-
-			if (oldTableType == TableType.VIRTUAL_VIEW) {
-				if (!(newCatalogTable instanceof CatalogView)) {
-					throw new CatalogException(
-						String.format("Table types don't match. The existing table is a view, but the new catalog base table is not."));
-				}
-				// Else, do nothing
-			} else if ((oldTableType == TableType.MANAGED_TABLE)) {
-				if (!(newCatalogTable instanceof CatalogTable)) {
-					throw new CatalogException(
-						String.format("Table types don't match. The existing table is a table, but the new catalog base table is not."));
-				}
-				// Else, do nothing
-			} else {
-				throw new CatalogException(
-					String.format("Hive table type '%s' is not supported yet.",
-						oldTableType.name()));
-			}
-
-			Table newTable = createHiveTable(tablePath, newCatalogTable);
-
-			// client.alter_table() requires a valid location
-			// thus, if new table doesn't have that, it reuses location of the old table
-			if (!newTable.getSd().isSetLocation()) {
-				newTable.getSd().setLocation(oldTable.getSd().getLocation());
-			}
-
-			client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to rename table %s", tablePath.getFullName()), e);
-		}
-	}
-
-	@Override
-	public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-			throws TableNotExistException, CatalogException {
-		try {
-			client.dropTable(
-				tablePath.getDatabaseName(),
-				tablePath.getObjectName(),
-				// Indicate whether associated data should be deleted.
-				// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
-				true,
-				ignoreIfNotExists);
-		} catch (NoSuchObjectException e) {
-			if (!ignoreIfNotExists) {
-				throw new TableNotExistException(catalogName, tablePath, e);
-			}
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to drop table %s", tablePath.getFullName()), e);
-		}
-	}
-
-	@Override
-	public List<String> listTables(String databaseName)
-			throws DatabaseNotExistException, CatalogException {
-		try {
-			return client.getAllTables(databaseName);
-		} catch (UnknownDBException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName, e);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to list tables in database %s", databaseName), e);
-		}
-	}
-
-	@Override
-	public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
-		try {
-			return client.getTables(
-				databaseName,
-				null, // table pattern
-				TableType.VIRTUAL_VIEW);
-		} catch (UnknownDBException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName, e);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to list views in database %s", databaseName), e);
-		}
-	}
-
-	@Override
-	public boolean tableExists(ObjectPath tablePath) throws CatalogException {
-		try {
-			return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
-		} catch (UnknownDBException e) {
-			return false;
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
-		}
-	}
-
-	private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
-		try {
-			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
-		} catch (NoSuchObjectException e) {
-			throw new TableNotExistException(catalogName, tablePath, e);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
index 45a6271..06ad4fc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
@@ -18,61 +18,46 @@
 
 package org.apache.flink.table.catalog.hive;
 
-import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.AbstractCatalogDatabase;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A hive catalog database implementation.
  */
-public class HiveCatalogDatabase implements CatalogDatabase {
-	// Property of the database
-	private final Map<String, String> properties;
+public class HiveCatalogDatabase extends AbstractCatalogDatabase {
 	// HDFS path of the database
 	private final String location;
-	// Comment of the database
-	private final String comment;
 
 	public HiveCatalogDatabase(Map<String, String> properties, String comment) {
-		this(properties, null, comment);
+		super(properties, comment);
+		location = null;
 	}
 
 	public HiveCatalogDatabase(Map<String, String> properties, String location, String comment) {
-		this.properties = checkNotNull(properties, "properties cannot be null");
+		super(properties, comment);
 		this.location = location;
-		this.comment = checkNotNull(comment, "comment cannot be null");
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
-	}
-
-	@Override
-	public String getComment() {
-		return comment;
 	}
 
 	@Override
 	public HiveCatalogDatabase copy() {
-		return new HiveCatalogDatabase(new HashMap<>(properties), location, comment);
+		return new HiveCatalogDatabase(new HashMap<>(getProperties()), location, getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.of(comment);
+		return Optional.ofNullable(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a Hive catalog database stored in memory only");
+		return Optional.of("This is a Hive catalog database");
 	}
 
 	public String getLocation() {
 		return location;
 	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
index 5a2fbff..ee3d14e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogTable;
 import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,30 +28,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A Hive catalog table implementation.
  */
-public class HiveCatalogTable implements CatalogTable {
-	// Schema of the table (column names and types)
-	private final TableSchema tableSchema;
-	// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
-	private final List<String> partitionKeys;
-	// Properties of the table
-	private final Map<String, String> properties;
-	// Comment of the table
-	private final String comment;
+public class HiveCatalogTable extends AbstractCatalogTable {
 
 	public HiveCatalogTable(
 			TableSchema tableSchema,
 			List<String> partitionKeys,
 			Map<String, String> properties,
 			String comment) {
-		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
-		this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
-		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
+		super(tableSchema, partitionKeys, properties, comment);
 	}
 
 	public HiveCatalogTable(
@@ -62,44 +49,27 @@ public class HiveCatalogTable implements CatalogTable {
 	}
 
 	@Override
-	public boolean isPartitioned() {
-		return !partitionKeys.isEmpty();
-	}
-
-	@Override
-	public List<String> getPartitionKeys() {
-		return partitionKeys;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
-	}
-
-	@Override
-	public TableSchema getSchema() {
-		return tableSchema;
-	}
-
-	@Override
-	public String getComment() {
-		return comment;
-	}
-
-	@Override
 	public CatalogBaseTable copy() {
 		return new HiveCatalogTable(
-			tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(properties), comment);
+			getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.ofNullable(comment);
+		return Optional.ofNullable(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
 		// TODO: return a detailed description
-		return Optional.ofNullable(comment);
+		return Optional.ofNullable(getComment());
 	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		// TODO: output properties that are used to auto-discover TableFactory for Hive tables.
+		Map<String, String> properties = new HashMap<>();
+		return properties;
+	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
index 9930709..927e6c7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
@@ -19,36 +19,17 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogView;
 import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A Hive catalog view implementation.
  */
-public class HiveCatalogView implements CatalogView {
-	// Original text of the view definition.
-	private final String originalQuery;
-
-	// Expanded text of the original view definition
-	// This is needed because the context such as current DB is
-	// lost after the session, in which view is defined, is gone.
-	// Expanded query text takes care of the this, as an example.
-	private final String expandedQuery;
-
-	// Schema of the view (column names and types)
-	private final TableSchema tableSchema;
-	// Properties of the view
-	private final Map<String, String> properties;
-	// Comment of the view
-	private final String comment;
+public class HiveCatalogView extends AbstractCatalogView {
 
 	public HiveCatalogView(
 			String originalQuery,
@@ -56,55 +37,24 @@ public class HiveCatalogView implements CatalogView {
 			TableSchema tableSchema,
 			Map<String, String> properties,
 			String comment) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
-
-		this.originalQuery = originalQuery;
-		this.expandedQuery = expandedQuery;
-		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
-		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
-	}
-
-	@Override
-	public String getOriginalQuery() {
-		return originalQuery;
-	}
-
-	@Override
-	public String getExpandedQuery() {
-		return expandedQuery;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
-	}
-
-	@Override
-	public TableSchema getSchema() {
-		return tableSchema;
-	}
-
-	@Override
-	public String getComment() {
-		return comment;
+		super(originalQuery, expandedQuery, tableSchema, properties, comment);
 	}
 
 	@Override
 	public CatalogBaseTable copy() {
 		return new HiveCatalogView(
-			originalQuery, expandedQuery, tableSchema.copy(), new HashMap<>(properties), comment);
+			this.getOriginalQuery(), this.getExpandedQuery(), this.getSchema().copy(), new HashMap<>(this.getProperties()), getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.ofNullable(comment);
+		return Optional.ofNullable(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
 		// TODO: return a detailed description
-		return Optional.ofNullable(comment);
+		return Optional.ofNullable(getComment());
 	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 24e3439..5c69f50 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -44,7 +44,7 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 
 	@BeforeClass
 	public static void init() throws IOException {
-		catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+		catalog = HiveTestUtils.createHiveCatalog();
 		catalog.open();
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 2618c8c..0d4367a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -34,13 +34,6 @@ public class HiveTestUtils {
 	private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
 	/**
-	 * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
-	 */
-	public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException {
-		return new GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
-	}
-
-	/**
 	 * Create a HiveCatalog with an embedded Hive Metastore.
 	 */
 	public static HiveCatalog createHiveCatalog() throws IOException {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
similarity index 63%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
index dd2b1f0..424048e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
@@ -18,24 +18,20 @@
 
 package org.apache.flink.table.catalog;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog database implementation.
+ * An abstract class representing a database in a catalog.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogDatabase implements CatalogDatabase {
+	// Property of the database
 	private final Map<String, String> properties;
+	// Comment of the database
 	private final String comment;
 
-	public GenericCatalogDatabase(Map<String, String> properties) {
-		this(properties, null);
-	}
-
-	public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+	public AbstractCatalogDatabase(Map<String, String> properties, String comment) {
 		this.properties = checkNotNull(properties, "properties cannot be null");
 		this.comment = comment;
 	}
@@ -47,21 +43,7 @@ public class GenericCatalogDatabase implements CatalogDatabase {
 
 	@Override
 	public String getComment() {
-		return this.comment;
-	}
-
-	@Override
-	public GenericCatalogDatabase copy() {
-		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+		return comment;
 	}
 
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog database stored in memory only");
-	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
similarity index 58%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
index dd2b1f0..f4046b0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
@@ -18,50 +18,35 @@
 
 package org.apache.flink.table.catalog;
 
-import java.util.HashMap;
+import org.apache.flink.util.StringUtils;
+
 import java.util.Map;
-import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog database implementation.
+ * An abstract catalog function implementation.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogFunction implements CatalogFunction {
+	private final String className; // Fully qualified class name of the function
 	private final Map<String, String> properties;
-	private final String comment;
 
-	public GenericCatalogDatabase(Map<String, String> properties) {
-		this(properties, null);
-	}
+	public AbstractCatalogFunction(String className, Map<String, String> properties) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(className), "className cannot be null or empty");
 
-	public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+		this.className = className;
 		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
 	}
 
 	@Override
-	public String getComment() {
-		return this.comment;
+	public String getClassName() {
+		return this.className;
 	}
 
 	@Override
-	public GenericCatalogDatabase copy() {
-		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
-	}
-
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
+	public Map<String, String> getProperties() {
+		return this.properties;
 	}
 
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog database stored in memory only");
-	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
similarity index 62%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
index dd2b1f0..818d72a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.table.catalog;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog database implementation.
+ * An abstract catalog partition implementation.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogPartition implements CatalogPartition {
 	private final Map<String, String> properties;
 	private final String comment;
 
-	public GenericCatalogDatabase(Map<String, String> properties) {
-		this(properties, null);
-	}
-
-	public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+	public AbstractCatalogPartition(Map<String, String> properties, String comment) {
 		this.properties = checkNotNull(properties, "properties cannot be null");
 		this.comment = comment;
 	}
@@ -45,23 +39,8 @@ public class GenericCatalogDatabase implements CatalogDatabase {
 		return properties;
 	}
 
-	@Override
 	public String getComment() {
-		return this.comment;
-	}
-
-	@Override
-	public GenericCatalogDatabase copy() {
-		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+		return comment;
 	}
 
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog database stored in memory only");
-	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
similarity index 72%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index eff8305..c65ca73 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -21,17 +21,15 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.table.api.TableSchema;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog table implementation.
+ * An abstract catalog table.
  */
-public class GenericCatalogTable implements CatalogTable {
+public abstract class AbstractCatalogTable implements CatalogTable {
 	// Schema of the table (column names and types)
 	private final TableSchema tableSchema;
 	// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
@@ -41,7 +39,14 @@ public class GenericCatalogTable implements CatalogTable {
 	// Comment of the table
 	private final String comment;
 
-	public GenericCatalogTable(
+	public AbstractCatalogTable(
+		TableSchema tableSchema,
+		Map<String, String> properties,
+		String comment) {
+		this(tableSchema, new ArrayList<>(), properties, comment);
+	}
+
+	public AbstractCatalogTable(
 			TableSchema tableSchema,
 			List<String> partitionKeys,
 			Map<String, String> properties,
@@ -52,13 +57,6 @@ public class GenericCatalogTable implements CatalogTable {
 		this.comment = comment;
 	}
 
-	public GenericCatalogTable(
-			TableSchema tableSchema,
-			Map<String, String> properties,
-			String description) {
-		this(tableSchema, new ArrayList<>(), properties, description);
-	}
-
 	@Override
 	public boolean isPartitioned() {
 		return !partitionKeys.isEmpty();
@@ -76,7 +74,7 @@ public class GenericCatalogTable implements CatalogTable {
 
 	@Override
 	public TableSchema getSchema() {
-		return this.tableSchema;
+		return tableSchema;
 	}
 
 	@Override
@@ -84,20 +82,4 @@ public class GenericCatalogTable implements CatalogTable {
 		return comment;
 	}
 
-	@Override
-	public GenericCatalogTable copy() {
-		return new GenericCatalogTable(
-			this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
-	}
-
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a catalog table in an im-memory catalog");
-	}
-
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
similarity index 65%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
index 2a8a184..9bde0a2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
@@ -21,17 +21,15 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.util.StringUtils;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A generic catalog view implementation.
+ * An abstract catalog view.
  */
-public class GenericCatalogView implements CatalogView {
+public abstract class AbstractCatalogView implements CatalogView {
 	// Original text of the view definition.
 	private final String originalQuery;
 
@@ -41,25 +39,18 @@ public class GenericCatalogView implements CatalogView {
 	// Expanded query text takes care of the this, as an example.
 	private final String expandedQuery;
 
-	// Schema of the view (column names and types)
-	private final TableSchema tableSchema;
-	// Properties of the view
+	private final TableSchema schema;
 	private final Map<String, String> properties;
-	// Comment of the view
 	private final String comment;
 
-	public GenericCatalogView(
-			String originalQuery,
-			String expandedQuery,
-			TableSchema tableSchema,
-			Map<String, String> properties,
-			String comment) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
+	public AbstractCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
+			Map<String, String> properties, String comment) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "originalQuery cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expandedQuery cannot be null or empty");
 
 		this.originalQuery = originalQuery;
 		this.expandedQuery = expandedQuery;
-		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
+		this.schema = checkNotNull(schema, "schema cannot be null");
 		this.properties = checkNotNull(properties, "properties cannot be null");
 		this.comment = comment;
 	}
@@ -76,32 +67,16 @@ public class GenericCatalogView implements CatalogView {
 
 	@Override
 	public Map<String, String> getProperties() {
-		return properties;
+		return this.properties;
 	}
 
 	@Override
 	public TableSchema getSchema() {
-		return tableSchema;
+		return this.schema;
 	}
 
-	@Override
 	public String getComment() {
-		return comment;
+		return this.comment;
 	}
 
-	@Override
-	public GenericCatalogView copy() {
-		return new GenericCatalogView(this.originalQuery, this.expandedQuery, tableSchema.copy(),
-			new HashMap<>(this.properties), comment);
-	}
-
-	@Override
-	public Optional<String> getDescription() {
-		return Optional.of(comment);
-	}
-
-	@Override
-	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a catalog view in an im-memory catalog");
-	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index dd2b1f0..44cf934 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -22,46 +22,29 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A generic catalog database implementation.
  */
-public class GenericCatalogDatabase implements CatalogDatabase {
-	private final Map<String, String> properties;
-	private final String comment;
-
-	public GenericCatalogDatabase(Map<String, String> properties) {
-		this(properties, null);
-	}
+public class GenericCatalogDatabase extends AbstractCatalogDatabase {
 
 	public GenericCatalogDatabase(Map<String, String> properties, String comment) {
-		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
-	}
-
-	@Override
-	public String getComment() {
-		return this.comment;
+		super(properties, comment);
+		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
 	}
 
 	@Override
 	public GenericCatalogDatabase copy() {
-		return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+		return new GenericCatalogDatabase(new HashMap<>(getProperties()), getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.of(comment);
+		return Optional.of(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a generic catalog database stored in memory only");
+		return Optional.of("This is a generic catalog database");
 	}
+
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
index 5c0176d..fbfe271 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -25,33 +25,24 @@ import java.util.Optional;
 /**
  * A generic catalog function implementation.
  */
-public class GenericCatalogFunction implements CatalogFunction {
-
-	private final String className; // Fully qualified class name of the function
-	private final Map<String, String> properties;
-
-	public GenericCatalogFunction(String className) {
-		this(className, new HashMap<>());
-	}
+public class GenericCatalogFunction extends AbstractCatalogFunction {
 
 	public GenericCatalogFunction(String className, Map<String, String> properties) {
-		this.className = className;
-		this.properties = properties;
+		super(className, properties);
+		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
 	}
 
 	@Override
-	public String getClassName() {
-		return this.className;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return this.properties;
+	public GenericCatalogFunction copy() {
+		return new GenericCatalogFunction(getClassName(), new HashMap<>(getProperties()));
 	}
 
 	@Override
-	public GenericCatalogFunction copy() {
-		return new GenericCatalogFunction(className, new HashMap<>(properties));
+	public String toString() {
+		return "GenericCatalogFunction{" +
+			", className='" + getClassName() + '\'' +
+			", properties=" + getProperties() +
+			'}';
 	}
 
 	@Override
@@ -61,15 +52,7 @@ public class GenericCatalogFunction implements CatalogFunction {
 
 	@Override
 	public Optional<String> getDetailedDescription() {
-		return Optional.of("This is a user-defined function in an in-memory catalog implementation");
-	}
-
-	@Override
-	public String toString() {
-		return "GenericCatalogFunction{" +
-			", className='" + className + '\'' +
-			", properties=" + properties +
-			'}';
+		return Optional.of("This is a user-defined function");
 	}
 
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
index 085278e..37bc5e1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
@@ -25,37 +25,26 @@ import java.util.Optional;
 /**
  * A generic catalog partition implementation.
  */
-public class GenericCatalogPartition implements CatalogPartition {
-	private final Map<String, String> properties;
-
-	private String comment = "This is a generic catalog partition";
-
-	public GenericCatalogPartition(Map<String, String> properties) {
-		this.properties = properties;
-	}
+public class GenericCatalogPartition extends AbstractCatalogPartition {
 
 	public GenericCatalogPartition(Map<String, String> properties, String comment) {
-		this(properties);
-		this.comment = comment;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
+		super(properties, comment);
+		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
 	}
 
 	@Override
 	public CatalogPartition copy() {
-		return new GenericCatalogPartition(new HashMap<>(properties));
+		return new GenericCatalogPartition(new HashMap<>(getProperties()), getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.of(comment);
+		return Optional.of(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
 		return Optional.of("This is a generic catalog partition with detailed description");
 	}
+
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index eff8305..0eff977 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -26,73 +26,43 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A generic catalog table implementation.
  */
-public class GenericCatalogTable implements CatalogTable {
-	// Schema of the table (column names and types)
-	private final TableSchema tableSchema;
-	// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
-	private final List<String> partitionKeys;
-	// Properties of the table
-	private final Map<String, String> properties;
-	// Comment of the table
-	private final String comment;
+public class GenericCatalogTable extends AbstractCatalogTable {
 
 	public GenericCatalogTable(
-			TableSchema tableSchema,
-			List<String> partitionKeys,
-			Map<String, String> properties,
-			String comment) {
-		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
-		this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
-		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
+		TableSchema tableSchema,
+		Map<String, String> properties,
+		String comment) {
+		this(tableSchema, new ArrayList<>(), properties, comment);
 	}
 
 	public GenericCatalogTable(
 			TableSchema tableSchema,
+			List<String> partitionKeys,
 			Map<String, String> properties,
-			String description) {
-		this(tableSchema, new ArrayList<>(), properties, description);
-	}
-
-	@Override
-	public boolean isPartitioned() {
-		return !partitionKeys.isEmpty();
+			String comment) {
+		super(tableSchema, partitionKeys, properties, comment);
+		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
 	}
 
 	@Override
-	public List<String> getPartitionKeys() {
-		return partitionKeys;
+	public GenericCatalogTable copy() {
+		return new GenericCatalogTable(
+			getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
 	}
 
 	@Override
-	public Map<String, String> getProperties() {
+	public Map<String, String> toProperties() {
+		// TODO: Filter out ANY properties that are not needed for table discovery.
+		Map<String, String> properties = new HashMap<>();
 		return properties;
 	}
 
 	@Override
-	public TableSchema getSchema() {
-		return this.tableSchema;
-	}
-
-	@Override
-	public String getComment() {
-		return comment;
-	}
-
-	@Override
-	public GenericCatalogTable copy() {
-		return new GenericCatalogTable(
-			this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
-	}
-
-	@Override
 	public Optional<String> getDescription() {
-		return Optional.of(comment);
+		return Optional.of(getComment());
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
index 2a8a184..96e019f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
@@ -19,89 +19,36 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A generic catalog view implementation.
  */
-public class GenericCatalogView implements CatalogView {
-	// Original text of the view definition.
-	private final String originalQuery;
-
-	// Expanded text of the original view definition
-	// This is needed because the context such as current DB is
-	// lost after the session, in which view is defined, is gone.
-	// Expanded query text takes care of the this, as an example.
-	private final String expandedQuery;
-
-	// Schema of the view (column names and types)
-	private final TableSchema tableSchema;
-	// Properties of the view
-	private final Map<String, String> properties;
-	// Comment of the view
-	private final String comment;
-
-	public GenericCatalogView(
-			String originalQuery,
-			String expandedQuery,
-			TableSchema tableSchema,
-			Map<String, String> properties,
-			String comment) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
-
-		this.originalQuery = originalQuery;
-		this.expandedQuery = expandedQuery;
-		this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
-		this.properties = checkNotNull(properties, "properties cannot be null");
-		this.comment = comment;
-	}
-
-	@Override
-	public String getOriginalQuery() {
-		return this.originalQuery;
-	}
-
-	@Override
-	public String getExpandedQuery() {
-		return this.expandedQuery;
-	}
-
-	@Override
-	public Map<String, String> getProperties() {
-		return properties;
-	}
+public class GenericCatalogView extends AbstractCatalogView {
 
-	@Override
-	public TableSchema getSchema() {
-		return tableSchema;
-	}
-
-	@Override
-	public String getComment() {
-		return comment;
+	public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
+		Map<String, String> properties, String comment) {
+		super(originalQuery, expandedQuery, schema, properties, comment);
+		properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
 	}
 
 	@Override
 	public GenericCatalogView copy() {
-		return new GenericCatalogView(this.originalQuery, this.expandedQuery, tableSchema.copy(),
-			new HashMap<>(this.properties), comment);
+		return new GenericCatalogView(getOriginalQuery(), getExpandedQuery(), getSchema().copy(),
+			new HashMap<>(getProperties()), getComment());
 	}
 
 	@Override
 	public Optional<String> getDescription() {
-		return Optional.of(comment);
+		return Optional.of(getComment());
 	}
 
 	@Override
 	public Optional<String> getDetailedDescription() {
 		return Optional.of("This is a catalog view in an im-memory catalog");
 	}
+
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 6a1f5df..801aabb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -48,6 +48,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A generic catalog implementation that holds all meta objects in memory.
  */
 public class GenericInMemoryCatalog implements Catalog {
+	public static final String FLINK_IS_GENERIC_KEY = "is_generic";
+	public static final String FLINK_IS_GENERIC_VALUE = "true";
+
 	private static final String DEFAULT_DB = "default";
 
 	private final String defaultDatabase;
@@ -74,7 +77,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		this.catalogName = name;
 		this.defaultDatabase = defaultDatabase;
 		this.databases = new LinkedHashMap<>();
-		this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>()));
+		this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>(), ""));
 		this.tables = new LinkedHashMap<>();
 		this.functions = new LinkedHashMap<>();
 		this.partitions = new LinkedHashMap<>();
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index a013aed..f3d144f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -741,15 +741,15 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	private CatalogPartition createPartition() {
-		return new GenericCatalogPartition(getBatchTableProperties());
+		return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
 	}
 
 	private CatalogPartition createAnotherPartition() {
-		return new GenericCatalogPartition(getBatchTableProperties());
+		return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
 	}
 
 	private CatalogPartition createPartition(Map<String, String> props) {
-		return new GenericCatalogPartition(props);
+		return new GenericCatalogPartition(props, "Generic catalog table");
 	}
 
 	@Override
@@ -791,11 +791,11 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	}
 
 	protected CatalogFunction createFunction() {
-		return new GenericCatalogFunction(MyScalarFunction.class.getName());
+		return new GenericCatalogFunction(MyScalarFunction.class.getName(), new HashMap<>());
 	}
 
 	protected CatalogFunction createAnotherFunction() {
-		return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+		return new GenericCatalogFunction(MyOtherScalarFunction.class.getName(), new HashMap<>());
 	}
 
 	/**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index e38e930..46e327e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Represents a table in a catalog.
@@ -37,4 +38,12 @@ public interface CatalogTable extends CatalogBaseTable {
 	 * @return partition keys of the table
 	 */
 	List<String> getPartitionKeys();
+
+	/**
+	 * Return a property map for table factory discovery purpose. The properties will be used to match a [[TableFactory]].
+	 * Please refer to {@link org.apache.flink.table.factories.TableFactory}
+	 *
+	 * @return a map of properties
+	 */
+	Map<String, String> toProperties();
 }