You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 18:08:28 UTC

[flink] branch master updated: [FLINK-12678][table] Add AbstractCatalog to manage the common catalog name and default database name for catalogs

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba648a5  [FLINK-12678][table] Add AbstractCatalog to manage the common catalog name and default database name for catalogs
ba648a5 is described below

commit ba648a57c3bf65efb657095a5c682e2a852d1bdf
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed May 29 12:07:54 2019 -0700

    [FLINK-12678][table] Add AbstractCatalog to manage the common catalog name and default database name for catalogs
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 79 +++++++++-----------
 flink-table/flink-table-api-java/pom.xml           |  5 ++
 .../table/catalog/GenericInMemoryCatalog.java      | 85 ++++++++++------------
 .../flink/table/catalog/AbstractCatalog.java       | 51 +++++++++++++
 4 files changed, 128 insertions(+), 92 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 159499c..4081296 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.AbstractCatalogTable;
 import org.apache.flink.table.catalog.AbstractCatalogView;
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
@@ -90,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A catalog implementation for Hive.
  */
-public class HiveCatalog implements Catalog {
+public class HiveCatalog extends AbstractCatalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 	private static final String DEFAULT_DB = "default";
 	private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
@@ -106,10 +106,8 @@ public class HiveCatalog implements Catalog {
 	// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
 	private static final String FLINK_FUNCTION_PREFIX = "flink:";
 
-	protected final String catalogName;
 	protected final HiveConf hiveConf;
 
-	private final String defaultDatabase;
 	protected IMetaStoreClient client;
 
 	public HiveCatalog(String catalogName, String hivemetastoreURI) {
@@ -121,10 +119,7 @@ public class HiveCatalog implements Catalog {
 	}
 
 	public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
-		this.catalogName = catalogName;
-		this.defaultDatabase = defaultDatabase;
+		super(catalogName, defaultDatabase);
 		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
@@ -158,9 +153,9 @@ public class HiveCatalog implements Catalog {
 			LOG.info("Connected to Hive metastore");
 		}
 
-		if (!databaseExists(defaultDatabase)) {
+		if (!databaseExists(getDefaultDatabase())) {
 			throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
-				defaultDatabase, catalogName));
+				getDefaultDatabase(), getCatalogName()));
 		}
 	}
 
@@ -175,10 +170,6 @@ public class HiveCatalog implements Catalog {
 
 	// ------ databases ------
 
-	public String getDefaultDatabase() throws CatalogException {
-		return defaultDatabase;
-	}
-
 	@Override
 	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
 		Database hiveDatabase = getHiveDatabase(databaseName);
@@ -201,7 +192,7 @@ public class HiveCatalog implements Catalog {
 			client.createDatabase(hiveDatabase);
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
-				throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+				throw new DatabaseAlreadyExistException(getCatalogName(), hiveDatabase.getName());
 			}
 		} catch (TException e) {
 			throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
@@ -268,7 +259,7 @@ public class HiveCatalog implements Catalog {
 			return client.getAllDatabases();
 		} catch (TException e) {
 			throw new CatalogException(
-				String.format("Failed to list all databases in %s", catalogName), e);
+				String.format("Failed to list all databases in %s", getCatalogName()), e);
 		}
 	}
 
@@ -291,10 +282,10 @@ public class HiveCatalog implements Catalog {
 			client.dropDatabase(name, true, ignoreIfNotExists);
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
-				throw new DatabaseNotExistException(catalogName, name);
+				throw new DatabaseNotExistException(getCatalogName(), name);
 			}
 		} catch (InvalidOperationException e) {
-			throw new DatabaseNotEmptyException(catalogName, name);
+			throw new DatabaseNotEmptyException(getCatalogName(), name);
 		} catch (TException e) {
 			throw new CatalogException(String.format("Failed to drop database %s", name), e);
 		}
@@ -304,10 +295,10 @@ public class HiveCatalog implements Catalog {
 		try {
 			return client.getDatabase(databaseName);
 		} catch (NoSuchObjectException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		} catch (TException e) {
 			throw new CatalogException(
-				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+				String.format("Failed to get database %s from %s", databaseName, getCatalogName()), e);
 		}
 	}
 
@@ -328,7 +319,7 @@ public class HiveCatalog implements Catalog {
 		checkNotNull(table, "table cannot be null");
 
 		if (!databaseExists(tablePath.getDatabaseName())) {
-			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+			throw new DatabaseNotExistException(getCatalogName(), tablePath.getDatabaseName());
 		}
 
 		Table hiveTable = instantiateHiveTable(tablePath, table);
@@ -337,7 +328,7 @@ public class HiveCatalog implements Catalog {
 			client.createTable(hiveTable);
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
-				throw new TableAlreadyExistException(catalogName, tablePath);
+				throw new TableAlreadyExistException(getCatalogName(), tablePath);
 			}
 		} catch (TException e) {
 			throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
@@ -358,14 +349,14 @@ public class HiveCatalog implements Catalog {
 				// alter_table() doesn't throw a clear exception when new table already exists.
 				// Thus, check the table existence explicitly
 				if (tableExists(newPath)) {
-					throw new TableAlreadyExistException(catalogName, newPath);
+					throw new TableAlreadyExistException(getCatalogName(), newPath);
 				} else {
 					Table table = getHiveTable(tablePath);
 					table.setTableName(newTableName);
 					client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
 				}
 			} else if (!ignoreIfNotExists) {
-				throw new TableNotExistException(catalogName, tablePath);
+				throw new TableNotExistException(getCatalogName(), tablePath);
 			}
 		} catch (TException e) {
 			throw new CatalogException(
@@ -426,7 +417,7 @@ public class HiveCatalog implements Catalog {
 				ignoreIfNotExists);
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
-				throw new TableNotExistException(catalogName, tablePath);
+				throw new TableNotExistException(getCatalogName(), tablePath);
 			}
 		} catch (TException e) {
 			throw new CatalogException(
@@ -441,7 +432,7 @@ public class HiveCatalog implements Catalog {
 		try {
 			return client.getAllTables(databaseName);
 		} catch (UnknownDBException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to list tables in database %s", databaseName), e);
@@ -458,7 +449,7 @@ public class HiveCatalog implements Catalog {
 				null, // table pattern
 				TableType.VIRTUAL_VIEW);
 		} catch (UnknownDBException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to list views in database %s", databaseName), e);
@@ -484,7 +475,7 @@ public class HiveCatalog implements Catalog {
 		try {
 			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
 		} catch (NoSuchObjectException e) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
@@ -661,7 +652,7 @@ public class HiveCatalog implements Catalog {
 			client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
-				throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+				throw new PartitionAlreadyExistsException(getCatalogName(), tablePath, partitionSpec);
 			}
 		} catch (TException e) {
 			throw new CatalogException(
@@ -681,10 +672,10 @@ public class HiveCatalog implements Catalog {
 				getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
-				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+				throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
 			}
 		} catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath));
@@ -741,7 +732,7 @@ public class HiveCatalog implements Catalog {
 			Partition hivePartition = getHivePartition(tablePath, partitionSpec);
 			return instantiateCatalogPartition(hivePartition);
 		} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
@@ -769,7 +760,7 @@ public class HiveCatalog implements Catalog {
 				if (ignoreIfNotExists) {
 					return;
 				}
-				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+				throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 			}
 			Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
 			if (newHivePartition.getSd().getLocation() == null) {
@@ -782,10 +773,10 @@ public class HiveCatalog implements Catalog {
 			);
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
-				throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+				throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
 			}
 		} catch (InvalidOperationException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to alter existing partition with new partition %s of table %s",
@@ -812,7 +803,7 @@ public class HiveCatalog implements Catalog {
 		// validate partition values
 		for (int i = 0; i < partCols.size(); i++) {
 			if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
-				throw new PartitionSpecInvalidException(catalogName, partCols,
+				throw new PartitionSpecInvalidException(getCatalogName(), partCols,
 					new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec);
 			}
 		}
@@ -836,7 +827,7 @@ public class HiveCatalog implements Catalog {
 
 	private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
 		if (hiveTable.getPartitionKeysSize() == 0) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
+			throw new TableNotPartitionedException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -879,13 +870,13 @@ public class HiveCatalog implements Catalog {
 			throws PartitionSpecInvalidException {
 		Map<String, String> spec = partitionSpec.getPartitionSpec();
 		if (spec.size() != partitionKeys.size()) {
-			throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+			throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
 		}
 
 		List<String> values = new ArrayList<>(spec.size());
 		for (String key : partitionKeys) {
 			if (!spec.containsKey(key)) {
-				throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+				throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
 			} else {
 				values.add(spec.get(key));
 			}
@@ -927,10 +918,10 @@ public class HiveCatalog implements Catalog {
 		try {
 			client.createFunction(hiveFunction);
 		} catch (NoSuchObjectException e) {
-			throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName(), e);
+			throw new DatabaseNotExistException(getCatalogName(), functionPath.getDatabaseName(), e);
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
-				throw new FunctionAlreadyExistException(catalogName, functionPath, e);
+				throw new FunctionAlreadyExistException(getCatalogName(), functionPath, e);
 			}
 		} catch (TException e) {
 			throw new CatalogException(
@@ -986,7 +977,7 @@ public class HiveCatalog implements Catalog {
 			client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
 		} catch (NoSuchObjectException e) {
 			if (!ignoreIfNotExists) {
-				throw new FunctionNotExistException(catalogName, functionPath, e);
+				throw new FunctionNotExistException(getCatalogName(), functionPath, e);
 			}
 		} catch (TException e) {
 			throw new CatalogException(
@@ -1001,7 +992,7 @@ public class HiveCatalog implements Catalog {
 		// client.getFunctions() returns empty list when the database doesn't exist
 		// thus we need to explicitly check whether the database exists or not
 		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 
 		try {
@@ -1033,7 +1024,7 @@ public class HiveCatalog implements Catalog {
 				return new HiveCatalogFunction(function.getClassName());
 			}
 		} catch (NoSuchObjectException e) {
-			throw new FunctionNotExistException(catalogName, functionPath, e);
+			throw new FunctionNotExistException(getCatalogName(), functionPath, e);
 		} catch (TException e) {
 			throw new CatalogException(
 				String.format("Failed to get function %s", functionPath.getFullName()), e);
diff --git a/flink-table/flink-table-api-java/pom.xml b/flink-table/flink-table-api-java/pom.xml
index f8a8fd7..d4eefd3 100644
--- a/flink-table/flink-table-api-java/pom.xml
+++ b/flink-table/flink-table-api-java/pom.xml
@@ -52,5 +52,10 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index d46a423..6d028f0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -47,15 +47,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A generic catalog implementation that holds all meta objects in memory.
  */
-public class GenericInMemoryCatalog implements Catalog {
+public class GenericInMemoryCatalog extends AbstractCatalog {
 	public static final String FLINK_IS_GENERIC_KEY = "is_generic";
 	public static final String FLINK_IS_GENERIC_VALUE = "true";
 
 	private static final String DEFAULT_DB = "default";
 
-	private final String defaultDatabase;
-
-	private final String catalogName;
 	private final Map<String, CatalogDatabase> databases;
 	private final Map<ObjectPath, CatalogBaseTable> tables;
 	private final Map<ObjectPath, CatalogFunction> functions;
@@ -71,11 +68,8 @@ public class GenericInMemoryCatalog implements Catalog {
 	}
 
 	public GenericInMemoryCatalog(String name, String defaultDatabase) {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty");
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+		super(name, defaultDatabase);
 
-		this.catalogName = name;
-		this.defaultDatabase = defaultDatabase;
 		this.databases = new LinkedHashMap<>();
 		this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>(), ""));
 		this.tables = new LinkedHashMap<>();
@@ -99,11 +93,6 @@ public class GenericInMemoryCatalog implements Catalog {
 	// ------ databases ------
 
 	@Override
-	public String getDefaultDatabase() {
-		return defaultDatabase;
-	}
-
-	@Override
 	public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
 			throws DatabaseAlreadyExistException {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
@@ -111,7 +100,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 		if (databaseExists(databaseName)) {
 			if (!ignoreIfExists) {
-				throw new DatabaseAlreadyExistException(catalogName, databaseName);
+				throw new DatabaseAlreadyExistException(getCatalogName(), databaseName);
 			}
 		} else {
 			databases.put(databaseName, db.copy());
@@ -129,10 +118,10 @@ public class GenericInMemoryCatalog implements Catalog {
 			if (isDatabaseEmpty(databaseName)) {
 				databases.remove(databaseName);
 			} else {
-				throw new DatabaseNotEmptyException(catalogName, databaseName);
+				throw new DatabaseNotEmptyException(getCatalogName(), databaseName);
 			}
 		} else if (!ignoreIfNotExists) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 	}
 
@@ -161,7 +150,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 			databases.put(databaseName, newDatabase.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 	}
 
@@ -175,7 +164,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
 
 		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		} else {
 			return databases.get(databaseName).copy();
 		}
@@ -197,12 +186,12 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(table);
 
 		if (!databaseExists(tablePath.getDatabaseName())) {
-			throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+			throw new DatabaseNotExistException(getCatalogName(), tablePath.getDatabaseName());
 		}
 
 		if (tableExists(tablePath)) {
 			if (!ignoreIfExists) {
-				throw new TableAlreadyExistException(catalogName, tablePath);
+				throw new TableAlreadyExistException(getCatalogName(), tablePath);
 			}
 		} else {
 			tables.put(tablePath, table.copy());
@@ -232,7 +221,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 			tables.put(tablePath, newTable.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -251,7 +240,7 @@ public class GenericInMemoryCatalog implements Catalog {
 			partitionStats.remove(tablePath);
 			partitionColumnStats.remove(tablePath);
 		} else if (!ignoreIfNotExists) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -265,7 +254,7 @@ public class GenericInMemoryCatalog implements Catalog {
 			ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
 
 			if (tableExists(newPath)) {
-				throw new TableAlreadyExistException(catalogName, newPath);
+				throw new TableAlreadyExistException(getCatalogName(), newPath);
 			} else {
 				tables.put(newPath, tables.remove(tablePath));
 
@@ -295,7 +284,7 @@ public class GenericInMemoryCatalog implements Catalog {
 				}
 			}
 		} else if (!ignoreIfNotExists) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -304,7 +293,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
 
 		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 
 		return tables.keySet().stream()
@@ -317,7 +306,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
 
 		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 
 		return tables.keySet().stream()
@@ -331,7 +320,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(tablePath);
 
 		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		} else {
 			return tables.get(tablePath).copy();
 		}
@@ -346,7 +335,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 	private void ensureTableExists(ObjectPath tablePath) throws TableNotExistException {
 		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -359,12 +348,12 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(function);
 
 		if (!databaseExists(functionPath.getDatabaseName())) {
-			throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName());
+			throw new DatabaseNotExistException(getCatalogName(), functionPath.getDatabaseName());
 		}
 
 		if (functionExists(functionPath)) {
 			if (!ignoreIfExists) {
-				throw new FunctionAlreadyExistException(catalogName, functionPath);
+				throw new FunctionAlreadyExistException(getCatalogName(), functionPath);
 			}
 		} else {
 			functions.put(functionPath, function.copy());
@@ -389,7 +378,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 			functions.put(functionPath, newFunction.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new FunctionNotExistException(catalogName, functionPath);
+			throw new FunctionNotExistException(getCatalogName(), functionPath);
 		}
 	}
 
@@ -400,7 +389,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		if (functionExists(functionPath)) {
 			functions.remove(functionPath);
 		} else if (!ignoreIfNotExists) {
-			throw new FunctionNotExistException(catalogName, functionPath);
+			throw new FunctionNotExistException(getCatalogName(), functionPath);
 		}
 	}
 
@@ -409,7 +398,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
 
 		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
+			throw new DatabaseNotExistException(getCatalogName(), databaseName);
 		}
 
 		return functions.keySet().stream()
@@ -422,7 +411,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(functionPath);
 
 		if (!functionExists(functionPath)) {
-			throw new FunctionNotExistException(catalogName, functionPath);
+			throw new FunctionNotExistException(getCatalogName(), functionPath);
 		} else {
 			return functions.get(functionPath).copy();
 		}
@@ -449,7 +438,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 		if (partitionExists(tablePath, partitionSpec)) {
 			if (!ignoreIfExists) {
-				throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+				throw new PartitionAlreadyExistsException(getCatalogName(), tablePath, partitionSpec);
 			}
 		}
 
@@ -467,7 +456,7 @@ public class GenericInMemoryCatalog implements Catalog {
 			partitionStats.get(tablePath).remove(partitionSpec);
 			partitionColumnStats.get(tablePath).remove(partitionSpec);
 		} else if (!ignoreIfNotExists) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 	}
 
@@ -490,7 +479,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 			partitions.get(tablePath).put(partitionSpec, newPartition.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 	}
 
@@ -532,7 +521,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(partitionSpec);
 
 		if (!partitionExists(tablePath, partitionSpec)) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 
 		return partitions.get(tablePath).get(partitionSpec).copy();
@@ -550,7 +539,7 @@ public class GenericInMemoryCatalog implements Catalog {
 	private void ensureFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
 		throws TableNotExistException, PartitionSpecInvalidException {
 		if (!isFullPartitionSpec(tablePath, partitionSpec)) {
-			throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+			throw new PartitionSpecInvalidException(getCatalogName(), ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
 				tablePath, partitionSpec);
 		}
 	}
@@ -575,7 +564,7 @@ public class GenericInMemoryCatalog implements Catalog {
 
 	private void ensurePartitionedTable(ObjectPath tablePath) throws TableNotPartitionedException {
 		if (!isPartitionedTable(tablePath)) {
-			throw new TableNotPartitionedException(catalogName, tablePath);
+			throw new TableNotPartitionedException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -601,7 +590,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(tablePath);
 
 		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 
 		CatalogTableStatistics result = tableStats.get(tablePath);
@@ -613,7 +602,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(tablePath);
 
 		if (!tableExists(tablePath)) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 
 		CatalogColumnStatistics result = tableColumnStats.get(tablePath);
@@ -627,7 +616,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(partitionSpec);
 
 		if (!partitionExists(tablePath, partitionSpec)) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 
 		CatalogTableStatistics result = partitionStats.get(tablePath).get(partitionSpec);
@@ -641,7 +630,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(partitionSpec);
 
 		if (!partitionExists(tablePath, partitionSpec)) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 
 		CatalogColumnStatistics result = partitionColumnStats.get(tablePath).get(partitionSpec);
@@ -657,7 +646,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		if (tableExists(tablePath)) {
 			tableStats.put(tablePath, tableStatistics.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -670,7 +659,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		if (tableExists(tablePath)) {
 			tableColumnStats.put(tablePath, columnStatistics.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new TableNotExistException(catalogName, tablePath);
+			throw new TableNotExistException(getCatalogName(), tablePath);
 		}
 	}
 
@@ -684,7 +673,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		if (partitionExists(tablePath, partitionSpec)) {
 			partitionStats.get(tablePath).put(partitionSpec, partitionStatistics.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 	}
 
@@ -698,7 +687,7 @@ public class GenericInMemoryCatalog implements Catalog {
 		if (partitionExists(tablePath, partitionSpec)) {
 			partitionColumnStats.get(tablePath).put(partitionSpec, columnStatistics.copy());
 		} else if (!ignoreIfNotExists) {
-			throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+			throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
 		}
 	}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
new file mode 100644
index 0000000..8d4d957
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This interface is responsible for reading and writing metadata such as database/table/views/UDFs
+ * from a registered catalog. It connects a registered catalog and Flink's Table API.
+ */
+@PublicEvolving
+public abstract class AbstractCatalog implements Catalog {
+	private final String catalogName;
+	private final String defaultDatabase;
+
+	public AbstractCatalog(String catalogName, String defaultDatabase) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+
+		this.catalogName = catalogName;
+		this.defaultDatabase = defaultDatabase;
+	}
+
+	public String getCatalogName() {
+		return catalogName;
+	}
+
+	@Override
+	public String getDefaultDatabase() {
+		return defaultDatabase;
+	}
+}