You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 18:08:28 UTC
[flink] branch master updated: [FLINK-12678][table] Add
AbstractCatalog to manage the common catalog name and default database name
for catalogs
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba648a5 [FLINK-12678][table] Add AbstractCatalog to manage the common catalog name and default database name for catalogs
ba648a5 is described below
commit ba648a57c3bf65efb657095a5c682e2a852d1bdf
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed May 29 12:07:54 2019 -0700
[FLINK-12678][table] Add AbstractCatalog to manage the common catalog name and default database name for catalogs
---
.../flink/table/catalog/hive/HiveCatalog.java | 79 +++++++++-----------
flink-table/flink-table-api-java/pom.xml | 5 ++
.../table/catalog/GenericInMemoryCatalog.java | 85 ++++++++++------------
.../flink/table/catalog/AbstractCatalog.java | 51 +++++++++++++
4 files changed, 128 insertions(+), 92 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 159499c..4081296 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.AbstractCatalogTable;
import org.apache.flink.table.catalog.AbstractCatalogView;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
@@ -90,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A catalog implementation for Hive.
*/
-public class HiveCatalog implements Catalog {
+public class HiveCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private static final String DEFAULT_DB = "default";
private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
@@ -106,10 +106,8 @@ public class HiveCatalog implements Catalog {
// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
private static final String FLINK_FUNCTION_PREFIX = "flink:";
- protected final String catalogName;
protected final HiveConf hiveConf;
- private final String defaultDatabase;
protected IMetaStoreClient client;
public HiveCatalog(String catalogName, String hivemetastoreURI) {
@@ -121,10 +119,7 @@ public class HiveCatalog implements Catalog {
}
public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
- this.catalogName = catalogName;
- this.defaultDatabase = defaultDatabase;
+ super(catalogName, defaultDatabase);
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
LOG.info("Created HiveCatalog '{}'", catalogName);
@@ -158,9 +153,9 @@ public class HiveCatalog implements Catalog {
LOG.info("Connected to Hive metastore");
}
- if (!databaseExists(defaultDatabase)) {
+ if (!databaseExists(getDefaultDatabase())) {
throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
- defaultDatabase, catalogName));
+ getDefaultDatabase(), getCatalogName()));
}
}
@@ -175,10 +170,6 @@ public class HiveCatalog implements Catalog {
// ------ databases ------
- public String getDefaultDatabase() throws CatalogException {
- return defaultDatabase;
- }
-
@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
Database hiveDatabase = getHiveDatabase(databaseName);
@@ -201,7 +192,7 @@ public class HiveCatalog implements Catalog {
client.createDatabase(hiveDatabase);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+ throw new DatabaseAlreadyExistException(getCatalogName(), hiveDatabase.getName());
}
} catch (TException e) {
throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
@@ -268,7 +259,7 @@ public class HiveCatalog implements Catalog {
return client.getAllDatabases();
} catch (TException e) {
throw new CatalogException(
- String.format("Failed to list all databases in %s", catalogName), e);
+ String.format("Failed to list all databases in %s", getCatalogName()), e);
}
}
@@ -291,10 +282,10 @@ public class HiveCatalog implements Catalog {
client.dropDatabase(name, true, ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, name);
+ throw new DatabaseNotExistException(getCatalogName(), name);
}
} catch (InvalidOperationException e) {
- throw new DatabaseNotEmptyException(catalogName, name);
+ throw new DatabaseNotEmptyException(getCatalogName(), name);
} catch (TException e) {
throw new CatalogException(String.format("Failed to drop database %s", name), e);
}
@@ -304,10 +295,10 @@ public class HiveCatalog implements Catalog {
try {
return client.getDatabase(databaseName);
} catch (NoSuchObjectException e) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
} catch (TException e) {
throw new CatalogException(
- String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+ String.format("Failed to get database %s from %s", databaseName, getCatalogName()), e);
}
}
@@ -328,7 +319,7 @@ public class HiveCatalog implements Catalog {
checkNotNull(table, "table cannot be null");
if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+ throw new DatabaseNotExistException(getCatalogName(), tablePath.getDatabaseName());
}
Table hiveTable = instantiateHiveTable(tablePath, table);
@@ -337,7 +328,7 @@ public class HiveCatalog implements Catalog {
client.createTable(hiveTable);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
- throw new TableAlreadyExistException(catalogName, tablePath);
+ throw new TableAlreadyExistException(getCatalogName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
@@ -358,14 +349,14 @@ public class HiveCatalog implements Catalog {
// alter_table() doesn't throw a clear exception when new table already exists.
// Thus, check the table existence explicitly
if (tableExists(newPath)) {
- throw new TableAlreadyExistException(catalogName, newPath);
+ throw new TableAlreadyExistException(getCatalogName(), newPath);
} else {
Table table = getHiveTable(tablePath);
table.setTableName(newTableName);
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
}
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(
@@ -426,7 +417,7 @@ public class HiveCatalog implements Catalog {
ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(
@@ -441,7 +432,7 @@ public class HiveCatalog implements Catalog {
try {
return client.getAllTables(databaseName);
} catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list tables in database %s", databaseName), e);
@@ -458,7 +449,7 @@ public class HiveCatalog implements Catalog {
null, // table pattern
TableType.VIRTUAL_VIEW);
} catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list views in database %s", databaseName), e);
@@ -484,7 +475,7 @@ public class HiveCatalog implements Catalog {
try {
return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (NoSuchObjectException e) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
@@ -661,7 +652,7 @@ public class HiveCatalog implements Catalog {
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
- throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+ throw new PartitionAlreadyExistsException(getCatalogName(), tablePath, partitionSpec);
}
} catch (TException e) {
throw new CatalogException(
@@ -681,10 +672,10 @@ public class HiveCatalog implements Catalog {
getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
}
} catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath));
@@ -741,7 +732,7 @@ public class HiveCatalog implements Catalog {
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
return instantiateCatalogPartition(hivePartition);
} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
@@ -769,7 +760,7 @@ public class HiveCatalog implements Catalog {
if (ignoreIfNotExists) {
return;
}
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
if (newHivePartition.getSd().getLocation() == null) {
@@ -782,10 +773,10 @@ public class HiveCatalog implements Catalog {
);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
}
} catch (InvalidOperationException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to alter existing partition with new partition %s of table %s",
@@ -812,7 +803,7 @@ public class HiveCatalog implements Catalog {
// validate partition values
for (int i = 0; i < partCols.size(); i++) {
if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
- throw new PartitionSpecInvalidException(catalogName, partCols,
+ throw new PartitionSpecInvalidException(getCatalogName(), partCols,
new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec);
}
}
@@ -836,7 +827,7 @@ public class HiveCatalog implements Catalog {
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
if (hiveTable.getPartitionKeysSize() == 0) {
- throw new TableNotPartitionedException(catalogName, tablePath);
+ throw new TableNotPartitionedException(getCatalogName(), tablePath);
}
}
@@ -879,13 +870,13 @@ public class HiveCatalog implements Catalog {
throws PartitionSpecInvalidException {
Map<String, String> spec = partitionSpec.getPartitionSpec();
if (spec.size() != partitionKeys.size()) {
- throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
}
List<String> values = new ArrayList<>(spec.size());
for (String key : partitionKeys) {
if (!spec.containsKey(key)) {
- throw new PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, partitionSpec);
+ throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
} else {
values.add(spec.get(key));
}
@@ -927,10 +918,10 @@ public class HiveCatalog implements Catalog {
try {
client.createFunction(hiveFunction);
} catch (NoSuchObjectException e) {
- throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName(), e);
+ throw new DatabaseNotExistException(getCatalogName(), functionPath.getDatabaseName(), e);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
- throw new FunctionAlreadyExistException(catalogName, functionPath, e);
+ throw new FunctionAlreadyExistException(getCatalogName(), functionPath, e);
}
} catch (TException e) {
throw new CatalogException(
@@ -986,7 +977,7 @@ public class HiveCatalog implements Catalog {
client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
- throw new FunctionNotExistException(catalogName, functionPath, e);
+ throw new FunctionNotExistException(getCatalogName(), functionPath, e);
}
} catch (TException e) {
throw new CatalogException(
@@ -1001,7 +992,7 @@ public class HiveCatalog implements Catalog {
// client.getFunctions() returns empty list when the database doesn't exist
// thus we need to explicitly check whether the database exists or not
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
try {
@@ -1033,7 +1024,7 @@ public class HiveCatalog implements Catalog {
return new HiveCatalogFunction(function.getClassName());
}
} catch (NoSuchObjectException e) {
- throw new FunctionNotExistException(catalogName, functionPath, e);
+ throw new FunctionNotExistException(getCatalogName(), functionPath, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get function %s", functionPath.getFullName()), e);
diff --git a/flink-table/flink-table-api-java/pom.xml b/flink-table/flink-table-api-java/pom.xml
index f8a8fd7..d4eefd3 100644
--- a/flink-table/flink-table-api-java/pom.xml
+++ b/flink-table/flink-table-api-java/pom.xml
@@ -52,5 +52,10 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index d46a423..6d028f0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -47,15 +47,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A generic catalog implementation that holds all meta objects in memory.
*/
-public class GenericInMemoryCatalog implements Catalog {
+public class GenericInMemoryCatalog extends AbstractCatalog {
public static final String FLINK_IS_GENERIC_KEY = "is_generic";
public static final String FLINK_IS_GENERIC_VALUE = "true";
private static final String DEFAULT_DB = "default";
- private final String defaultDatabase;
-
- private final String catalogName;
private final Map<String, CatalogDatabase> databases;
private final Map<ObjectPath, CatalogBaseTable> tables;
private final Map<ObjectPath, CatalogFunction> functions;
@@ -71,11 +68,8 @@ public class GenericInMemoryCatalog implements Catalog {
}
public GenericInMemoryCatalog(String name, String defaultDatabase) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+ super(name, defaultDatabase);
- this.catalogName = name;
- this.defaultDatabase = defaultDatabase;
this.databases = new LinkedHashMap<>();
this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>(), ""));
this.tables = new LinkedHashMap<>();
@@ -99,11 +93,6 @@ public class GenericInMemoryCatalog implements Catalog {
// ------ databases ------
@Override
- public String getDefaultDatabase() {
- return defaultDatabase;
- }
-
- @Override
public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
@@ -111,7 +100,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (databaseExists(databaseName)) {
if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(catalogName, databaseName);
+ throw new DatabaseAlreadyExistException(getCatalogName(), databaseName);
}
} else {
databases.put(databaseName, db.copy());
@@ -129,10 +118,10 @@ public class GenericInMemoryCatalog implements Catalog {
if (isDatabaseEmpty(databaseName)) {
databases.remove(databaseName);
} else {
- throw new DatabaseNotEmptyException(catalogName, databaseName);
+ throw new DatabaseNotEmptyException(getCatalogName(), databaseName);
}
} else if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
}
@@ -161,7 +150,7 @@ public class GenericInMemoryCatalog implements Catalog {
databases.put(databaseName, newDatabase.copy());
} else if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
}
@@ -175,7 +164,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
} else {
return databases.get(databaseName).copy();
}
@@ -197,12 +186,12 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(table);
if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+ throw new DatabaseNotExistException(getCatalogName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
- throw new TableAlreadyExistException(catalogName, tablePath);
+ throw new TableAlreadyExistException(getCatalogName(), tablePath);
}
} else {
tables.put(tablePath, table.copy());
@@ -232,7 +221,7 @@ public class GenericInMemoryCatalog implements Catalog {
tables.put(tablePath, newTable.copy());
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -251,7 +240,7 @@ public class GenericInMemoryCatalog implements Catalog {
partitionStats.remove(tablePath);
partitionColumnStats.remove(tablePath);
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -265,7 +254,7 @@ public class GenericInMemoryCatalog implements Catalog {
ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
if (tableExists(newPath)) {
- throw new TableAlreadyExistException(catalogName, newPath);
+ throw new TableAlreadyExistException(getCatalogName(), newPath);
} else {
tables.put(newPath, tables.remove(tablePath));
@@ -295,7 +284,7 @@ public class GenericInMemoryCatalog implements Catalog {
}
}
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -304,7 +293,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
return tables.keySet().stream()
@@ -317,7 +306,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
return tables.keySet().stream()
@@ -331,7 +320,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
} else {
return tables.get(tablePath).copy();
}
@@ -346,7 +335,7 @@ public class GenericInMemoryCatalog implements Catalog {
private void ensureTableExists(ObjectPath tablePath) throws TableNotExistException {
if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -359,12 +348,12 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(function);
if (!databaseExists(functionPath.getDatabaseName())) {
- throw new DatabaseNotExistException(catalogName, functionPath.getDatabaseName());
+ throw new DatabaseNotExistException(getCatalogName(), functionPath.getDatabaseName());
}
if (functionExists(functionPath)) {
if (!ignoreIfExists) {
- throw new FunctionAlreadyExistException(catalogName, functionPath);
+ throw new FunctionAlreadyExistException(getCatalogName(), functionPath);
}
} else {
functions.put(functionPath, function.copy());
@@ -389,7 +378,7 @@ public class GenericInMemoryCatalog implements Catalog {
functions.put(functionPath, newFunction.copy());
} else if (!ignoreIfNotExists) {
- throw new FunctionNotExistException(catalogName, functionPath);
+ throw new FunctionNotExistException(getCatalogName(), functionPath);
}
}
@@ -400,7 +389,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (functionExists(functionPath)) {
functions.remove(functionPath);
} else if (!ignoreIfNotExists) {
- throw new FunctionNotExistException(catalogName, functionPath);
+ throw new FunctionNotExistException(getCatalogName(), functionPath);
}
}
@@ -409,7 +398,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
+ throw new DatabaseNotExistException(getCatalogName(), databaseName);
}
return functions.keySet().stream()
@@ -422,7 +411,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(functionPath);
if (!functionExists(functionPath)) {
- throw new FunctionNotExistException(catalogName, functionPath);
+ throw new FunctionNotExistException(getCatalogName(), functionPath);
} else {
return functions.get(functionPath).copy();
}
@@ -449,7 +438,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (partitionExists(tablePath, partitionSpec)) {
if (!ignoreIfExists) {
- throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+ throw new PartitionAlreadyExistsException(getCatalogName(), tablePath, partitionSpec);
}
}
@@ -467,7 +456,7 @@ public class GenericInMemoryCatalog implements Catalog {
partitionStats.get(tablePath).remove(partitionSpec);
partitionColumnStats.get(tablePath).remove(partitionSpec);
} else if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
}
@@ -490,7 +479,7 @@ public class GenericInMemoryCatalog implements Catalog {
partitions.get(tablePath).put(partitionSpec, newPartition.copy());
} else if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
}
@@ -532,7 +521,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(partitionSpec);
if (!partitionExists(tablePath, partitionSpec)) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
return partitions.get(tablePath).get(partitionSpec).copy();
@@ -550,7 +539,7 @@ public class GenericInMemoryCatalog implements Catalog {
private void ensureFullPartitionSpec(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, PartitionSpecInvalidException {
if (!isFullPartitionSpec(tablePath, partitionSpec)) {
- throw new PartitionSpecInvalidException(catalogName, ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+ throw new PartitionSpecInvalidException(getCatalogName(), ((CatalogTable) getTable(tablePath)).getPartitionKeys(),
tablePath, partitionSpec);
}
}
@@ -575,7 +564,7 @@ public class GenericInMemoryCatalog implements Catalog {
private void ensurePartitionedTable(ObjectPath tablePath) throws TableNotPartitionedException {
if (!isPartitionedTable(tablePath)) {
- throw new TableNotPartitionedException(catalogName, tablePath);
+ throw new TableNotPartitionedException(getCatalogName(), tablePath);
}
}
@@ -601,7 +590,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
CatalogTableStatistics result = tableStats.get(tablePath);
@@ -613,7 +602,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
CatalogColumnStatistics result = tableColumnStats.get(tablePath);
@@ -627,7 +616,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(partitionSpec);
if (!partitionExists(tablePath, partitionSpec)) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
CatalogTableStatistics result = partitionStats.get(tablePath).get(partitionSpec);
@@ -641,7 +630,7 @@ public class GenericInMemoryCatalog implements Catalog {
checkNotNull(partitionSpec);
if (!partitionExists(tablePath, partitionSpec)) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
CatalogColumnStatistics result = partitionColumnStats.get(tablePath).get(partitionSpec);
@@ -657,7 +646,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (tableExists(tablePath)) {
tableStats.put(tablePath, tableStatistics.copy());
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -670,7 +659,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (tableExists(tablePath)) {
tableColumnStats.put(tablePath, columnStatistics.copy());
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
+ throw new TableNotExistException(getCatalogName(), tablePath);
}
}
@@ -684,7 +673,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (partitionExists(tablePath, partitionSpec)) {
partitionStats.get(tablePath).put(partitionSpec, partitionStatistics.copy());
} else if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
}
@@ -698,7 +687,7 @@ public class GenericInMemoryCatalog implements Catalog {
if (partitionExists(tablePath, partitionSpec)) {
partitionColumnStats.get(tablePath).put(partitionSpec, columnStatistics.copy());
} else if (!ignoreIfNotExists) {
- throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+ throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
new file mode 100644
index 0000000..8d4d957
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This interface is responsible for reading and writing metadata such as database/table/views/UDFs
+ * from a registered catalog. It connects a registered catalog and Flink's Table API.
+ */
+@PublicEvolving
+public abstract class AbstractCatalog implements Catalog {
+ private final String catalogName;
+ private final String defaultDatabase;
+
+ public AbstractCatalog(String catalogName, String defaultDatabase) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() {
+ return defaultDatabase;
+ }
+}