You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/21 02:29:05 UTC
[flink] branch master updated: [FLINK-12552][table]: Combine
HiveCatalog and GenericHiveMetastoreCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3fba21a [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCatalog
3fba21a is described below
commit 3fba21a0ce5534b8c95a2bb9e8547b0061e5bb8f
Author: Xuefu Zhang <xu...@alibaba-inc.com>
AuthorDate: Sun May 19 08:26:09 2019 -0700
[FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCatalog
This PR is to combine GenericHiveMetastoreCatalog and HiveCatalog into one single class.
This closes #8480.
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 357 --------------
.../flink/table/catalog/hive/HiveCatalog.java | 519 ++++++++++++++++++---
.../flink/table/catalog/hive/HiveCatalogBase.java | 432 -----------------
.../table/catalog/hive/HiveCatalogDatabase.java | 33 +-
.../flink/table/catalog/hive/HiveCatalogTable.java | 58 +--
.../flink/table/catalog/hive/HiveCatalogView.java | 64 +--
.../hive/GenericHiveMetastoreCatalogTest.java | 2 +-
.../flink/table/catalog/hive/HiveTestUtils.java | 7 -
...gDatabase.java => AbstractCatalogDatabase.java} | 30 +-
...gDatabase.java => AbstractCatalogFunction.java} | 41 +-
...Database.java => AbstractCatalogPartition.java} | 29 +-
...CatalogTable.java => AbstractCatalogTable.java} | 40 +-
...icCatalogView.java => AbstractCatalogView.java} | 47 +-
.../table/catalog/GenericCatalogDatabase.java | 31 +-
.../table/catalog/GenericCatalogFunction.java | 39 +-
.../table/catalog/GenericCatalogPartition.java | 23 +-
.../flink/table/catalog/GenericCatalogTable.java | 62 +--
.../flink/table/catalog/GenericCatalogView.java | 71 +--
.../table/catalog/GenericInMemoryCatalog.java | 5 +-
.../table/catalog/GenericInMemoryCatalogTest.java | 10 +-
.../apache/flink/table/catalog/CatalogTable.java | 9 +
21 files changed, 606 insertions(+), 1303 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
deleted file mode 100644
index b906e5c..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.GenericCatalogDatabase;
-import org.apache.flink.table.catalog.GenericCatalogTable;
-import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage.
- */
-public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
- private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
-
- // Prefix used to distinguish properties created by Hive and Flink,
- // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
- private static final String FLINK_PROPERTY_PREFIX = "flink.";
-
- public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) {
- super(catalogName, hivemetastoreURI);
-
- LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
- }
-
- public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
- super(catalogName, hiveConf);
- }
-
- public GenericHiveMetastoreCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
- super(catalogName, defaultDatabase, hiveConf);
-
- LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
- }
-
- // ------ databases ------
-
- @Override
- protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
- return new GenericCatalogDatabase(
- retrieveFlinkProperties(hiveDatabase.getParameters()),
- hiveDatabase.getDescription()
- );
- }
-
- @Override
- protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
- return new Database(
- databaseName,
- catalogDatabase.getComment(),
- // HDFS location URI which GenericCatalogDatabase shouldn't care
- null,
- maskFlinkProperties(catalogDatabase.getProperties()));
- }
-
- // ------ tables and views------
-
- @Override
- protected void validateCatalogBaseTable(CatalogBaseTable table)
- throws CatalogException {
- if (!(table instanceof GenericCatalogTable) && !(table instanceof GenericCatalogView)) {
- throw new CatalogException(
- "GenericHiveMetastoreCatalog can only operate on GenericCatalogTable and GenericCatalogView.");
- }
- }
-
- @Override
- protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
- // Table schema
- TableSchema tableSchema = HiveTableUtil.createTableSchema(
- hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
-
- // Table properties
- Map<String, String> properties = retrieveFlinkProperties(hiveTable.getParameters());
-
- // Table comment
- String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
-
- // Partition keys
- List<String> partitionKeys = new ArrayList<>();
-
- if (!hiveTable.getPartitionKeys().isEmpty()) {
- partitionKeys = hiveTable.getPartitionKeys().stream()
- .map(fs -> fs.getName())
- .collect(Collectors.toList());
- }
-
- if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
- return new GenericCatalogView(
- hiveTable.getViewOriginalText(),
- hiveTable.getViewExpandedText(),
- tableSchema,
- properties,
- comment
- );
- } else {
- return new GenericCatalogTable(
- tableSchema, partitionKeys, properties, comment);
- }
- }
-
- @Override
- protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
- Map<String, String> properties = new HashMap<>(table.getProperties());
-
- // Table comment
- properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
-
- Table hiveTable = new Table();
- hiveTable.setDbName(tablePath.getDatabaseName());
- hiveTable.setTableName(tablePath.getObjectName());
- hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
-
- // Table properties
- hiveTable.setParameters(maskFlinkProperties(properties));
-
- // Hive table's StorageDescriptor
- StorageDescriptor sd = new StorageDescriptor();
- sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
-
- List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
-
- // Table columns and partition keys
- if (table instanceof CatalogTable) {
- CatalogTable catalogTable = (CatalogTable) table;
-
- if (catalogTable.isPartitioned()) {
- int partitionKeySize = catalogTable.getPartitionKeys().size();
- List<FieldSchema> regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize);
- List<FieldSchema> partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
- sd.setCols(regularColumns);
- hiveTable.setPartitionKeys(partitionColumns);
- } else {
- sd.setCols(allColumns);
- hiveTable.setPartitionKeys(new ArrayList<>());
- }
- } else if (table instanceof CatalogView) {
- CatalogView view = (CatalogView) table;
-
- // TODO: [FLINK-12398] Support partitioned view in catalog API
- sd.setCols(allColumns);
- hiveTable.setPartitionKeys(new ArrayList<>());
-
- hiveTable.setViewOriginalText(view.getOriginalQuery());
- hiveTable.setViewExpandedText(view.getExpandedQuery());
- hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
- } else {
- throw new CatalogException(
- "GenericHiveMetastoreCatalog only supports CatalogTable and CatalogView");
- }
-
- hiveTable.setSd(sd);
-
- return hiveTable;
- }
-
- // ------ partitions ------
-
- @Override
- public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
- throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- // ------ functions ------
-
- @Override
- public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
- throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
- throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
- throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean functionExists(ObjectPath functionPath) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- // ------ statistics ------
-
- @Override
- public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
-
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics,
- boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics,
- boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
-
- // ------ utils ------
-
- /**
- * Filter out Hive-created properties, and return Flink-created properties.
- */
- private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
- return hiveTableParams.entrySet().stream()
- .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
- .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
- }
-
- /**
- * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
- */
- private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
- return properties.entrySet().stream()
- .filter(e -> e.getKey() != null && e.getValue() != null)
- .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
- }
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 66826eb..13c5fde 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,32 +19,54 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogTable;
+import org.apache.flink.table.catalog.AbstractCatalogView;
+import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,117 +76,486 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A catalog implementation for Hive.
*/
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+ private static final String DEFAULT_DB = "default";
- public HiveCatalog(String catalogName, String hivemetastoreURI) {
- super(catalogName, hivemetastoreURI);
+ // Prefix used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+ private static final String FLINK_PROPERTY_PREFIX = "flink.";
+ private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY;
- LOG.info("Created HiveCatalog '{}'", catalogName);
+ protected final String catalogName;
+ protected final HiveConf hiveConf;
+
+ private final String defaultDatabase;
+ protected IMetaStoreClient client;
+
+ public HiveCatalog(String catalogName, String hivemetastoreURI) {
+ this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
public HiveCatalog(String catalogName, HiveConf hiveConf) {
- super(catalogName, hiveConf);
+ this(catalogName, DEFAULT_DB, hiveConf);
+ }
+
+ public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
LOG.info("Created HiveCatalog '{}'", catalogName);
}
+ private static HiveConf getHiveConf(String hiveMetastoreURI) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+ return hiveConf;
+ }
+
+ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+ try {
+ return RetryingMetaStoreClient.getProxy(
+ hiveConf,
+ null,
+ null,
+ HiveMetaStoreClient.class.getName(),
+ true);
+ } catch (MetaException e) {
+ throw new CatalogException("Failed to create Hive metastore client", e);
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (client == null) {
+ client = getMetastoreClient(hiveConf);
+ LOG.info("Connected to Hive metastore");
+ }
+
+ if (!databaseExists(defaultDatabase)) {
+ throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
+ defaultDatabase, catalogName));
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Close connection to Hive metastore");
+ }
+ }
+
// ------ databases ------
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
@Override
- protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
- return new HiveCatalogDatabase(
- hiveDatabase.getParameters(),
- hiveDatabase.getLocationUri(),
- hiveDatabase.getDescription());
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ Database hiveDatabase = getHiveDatabase(databaseName);
+
+ Map<String, String> properties = hiveDatabase.getParameters();
+ boolean isGeneric = Boolean.valueOf(properties.get(FLINK_PROPERTY_IS_GENERIC));
+ return !isGeneric ? new HiveCatalogDatabase(properties, hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+ new GenericCatalogDatabase(retrieveFlinkProperties(properties), hiveDatabase.getDescription());
}
@Override
- protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
- HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) catalogDatabase;
+ public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+ checkNotNull(database, "database cannot be null");
+
+ Database hiveDatabase;
+ if (database instanceof HiveCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) database);
+ } else if (database instanceof GenericCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) database);
+ } else {
+ throw new CatalogException(String.format("Unsupported catalog database type %s", database.getClass()), null);
+ }
- return new Database(
- databaseName,
- catalogDatabase.getComment(),
- hiveCatalogDatabase.getLocation(),
- hiveCatalogDatabase.getProperties());
+ try {
+ client.createDatabase(hiveDatabase);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
+ }
}
- // ------ tables and views------
+ private static Database instantiateHiveDatabase(String databaseName, HiveCatalogDatabase database) {
+ return new Database(databaseName,
+ database.getComment(),
+ database.getLocation(),
+ database.getProperties());
+ }
+
+ private static Database instantiateHiveDatabase(String databaseName, GenericCatalogDatabase database) {
+ Map<String, String> properties = database.getProperties();
+
+ return new Database(databaseName,
+ database.getComment(),
+ // HDFS location URI which GenericCatalogDatabase shouldn't care
+ null,
+ maskFlinkProperties(properties));
+ }
@Override
- protected void validateCatalogBaseTable(CatalogBaseTable table)
- throws CatalogException {
- if (!(table instanceof HiveCatalogTable) && !(table instanceof HiveCatalogView)) {
+ public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+ checkNotNull(newDatabase, "newDatabase cannot be null");
+
+ Database newHiveDatabase;
+ if (newDatabase instanceof HiveCatalogDatabase) {
+ newHiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) newDatabase);
+ } else if (newDatabase instanceof GenericCatalogDatabase) {
+ newHiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) newDatabase);
+ } else {
+ throw new CatalogException(String.format("Unsupported catalog database type %s", newDatabase.getClass()), null);
+ }
+
+ try {
+ if (databaseExists(databaseName)) {
+ client.alterDatabase(databaseName, newHiveDatabase);
+ } else if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to alter database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
throw new CatalogException(
- "HiveCatalog can only operate on HiveCatalogTable and HiveCatalogView.");
+ String.format("Failed to list all databases in %s", catalogName), e);
}
}
@Override
- protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
- // Table schema
- TableSchema tableSchema =
- HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ try {
+ return client.getDatabase(databaseName) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to determine whether database %s exists or not", databaseName), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException,
+ DatabaseNotEmptyException, CatalogException {
+ try {
+ client.dropDatabase(name, true, ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName, name);
+ }
+ } catch (InvalidOperationException e) {
+ throw new DatabaseNotEmptyException(catalogName, name);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to drop database %s", name), e);
+ }
+ }
+
+ private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+ try {
+ return client.getDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+ }
+ }
+
+ // ------ tables ------
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ Table hiveTable = getHiveTable(tablePath);
+ return instantiateHiveCatalogTable(hiveTable);
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(table, "table cannot be null");
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
+ }
+
+ Table hiveTable = instantiateHiveTable(tablePath, table);
+
+ try {
+ client.createTable(hiveTable);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName cannot be null or empty");
+
+ try {
+ // alter_table() doesn't throw a clear exception when target table doesn't exist.
+ // Thus, check the table existence explicitly
+ if (tableExists(tablePath)) {
+ ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+ // alter_table() doesn't throw a clear exception when new table already exists.
+ // Thus, check the table existence explicitly
+ if (tableExists(newPath)) {
+ throw new TableAlreadyExistException(catalogName, newPath);
+ } else {
+ Table table = getHiveTable(tablePath);
+ table.setTableName(newTableName);
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+ }
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to rename table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
+
+ if (!tableExists(tablePath)) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ return;
+ }
+
+ Table oldTable = getHiveTable(tablePath);
+ TableType oldTableType = TableType.valueOf(oldTable.getTableType());
+
+ if (oldTableType == TableType.VIRTUAL_VIEW) {
+ if (!(newCatalogTable instanceof CatalogView)) {
+ throw new CatalogException(
+ String.format("Table types don't match. The existing table is a view, but the new catalog base table is not."));
+ }
+ // Else, do nothing
+ } else if ((oldTableType == TableType.MANAGED_TABLE)) {
+ if (!(newCatalogTable instanceof CatalogTable)) {
+ throw new CatalogException(
+ String.format("Table types don't match. The existing table is a table, but the new catalog base table is not."));
+ }
+ // Else, do nothing
+ } else {
+ throw new CatalogException(
+ String.format("Hive table type '%s' is not supported yet.",
+ oldTableType.name()));
+ }
+
+ Table newTable = instantiateHiveTable(tablePath, newCatalogTable);
+
+ // client.alter_table() requires a valid location
+ // thus, if new table doesn't have that, it reuses location of the old table
+ if (!newTable.getSd().isSetLocation()) {
+ newTable.getSd().setLocation(oldTable.getSd().getLocation());
+ }
+
+ try {
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to rename table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ // Indicate whether associated data should be deleted.
+ // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
+ true,
+ ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list tables in database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty");
+
+ try {
+ return client.getTables(
+ databaseName,
+ null, // table pattern
+ TableType.VIRTUAL_VIEW);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list views in database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ try {
+ return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (UnknownDBException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
+ }
+ }
+
+ private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+ try {
+ return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(catalogName, tablePath);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
+ }
+ }
+
+ private static CatalogBaseTable instantiateHiveCatalogTable(Table hiveTable) {
+ boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;
// Table properties
Map<String, String> properties = hiveTable.getParameters();
-
- // Table comment
+ boolean isGeneric = Boolean.valueOf(properties.get(FLINK_PROPERTY_IS_GENERIC));
+ if (isGeneric) {
+ properties = retrieveFlinkProperties(properties);
+ }
String comment = properties.remove(HiveTableConfig.TABLE_COMMENT);
+ // Table schema
+ TableSchema tableSchema =
+ HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+
// Partition keys
List<String> partitionKeys = new ArrayList<>();
-
if (!hiveTable.getPartitionKeys().isEmpty()) {
- partitionKeys = hiveTable.getPartitionKeys().stream()
- .map(fs -> fs.getName())
- .collect(Collectors.toList());
+ partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList());
}
- if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
- return new HiveCatalogView(
- hiveTable.getViewOriginalText(),
- hiveTable.getViewExpandedText(),
- tableSchema,
- properties,
- comment
- );
+ if (isView) {
+ if (isGeneric) {
+ return new GenericCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
+ } else {
+ return new HiveCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
+ }
} else {
- return new HiveCatalogTable(
- tableSchema, partitionKeys, properties, comment);
+ if (isGeneric) {
+ return new GenericCatalogTable(tableSchema, partitionKeys, properties, comment);
+ } else {
+ return new HiveCatalogTable(tableSchema, partitionKeys, properties, comment);
+ }
}
}
- @Override
- protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
- Map<String, String> properties = new HashMap<>(table.getProperties());
-
- // Table comment
- properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
-
+ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
Table hiveTable = new Table();
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+ Map<String, String> properties = new HashMap<>(table.getProperties());
+ // Table comment
+ properties.put(HiveTableConfig.TABLE_COMMENT, table.getComment());
+ if (table instanceof GenericCatalogTable || table instanceof GenericCatalogView) {
+ properties = maskFlinkProperties(properties);
+ }
// Table properties
hiveTable.setParameters(properties);
// Hive table's StorageDescriptor
// TODO: This is very basic Hive table.
- // [FLINK-11479] Add input/output format and SerDeLib information for Hive tables in HiveCatalogUtil#createHiveTable
+ // [FLINK-11479] Add input/output format and SerDeLib information for Hive tables.
StorageDescriptor sd = new StorageDescriptor();
+ hiveTable.setSd(sd);
sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
// Table columns and partition keys
- if (table instanceof HiveCatalogTable) {
- HiveCatalogTable catalogTable = (HiveCatalogTable) table;
+ if (table instanceof AbstractCatalogTable) {
+ AbstractCatalogTable catalogTable = (AbstractCatalogTable) table;
if (catalogTable.isPartitioned()) {
int partitionKeySize = catalogTable.getPartitionKeys().size();
@@ -177,8 +568,8 @@ public class HiveCatalog extends HiveCatalogBase {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
- } else if (table instanceof HiveCatalogView) {
- HiveCatalogView view = (HiveCatalogView) table;
+ } else if (table instanceof AbstractCatalogView) {
+ AbstractCatalogView view = (AbstractCatalogView) table;
// TODO: [FLINK-12398] Support partitioned view in catalog API
sd.setCols(allColumns);
@@ -192,11 +583,27 @@ public class HiveCatalog extends HiveCatalogBase {
"HiveCatalog only supports HiveCatalogTable and HiveCatalogView");
}
- hiveTable.setSd(sd);
-
return hiveTable;
}
+ /**
+ * Filter out Hive-created properties, and return Flink-created properties.
+ */
+ private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
+ return hiveTableParams.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+ .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+ }
+
+ /**
+ * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+ */
+ private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
+ return properties.entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
+ }
+
// ------ partitions ------
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
deleted file mode 100644
index ea7e221..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for catalogs backed by Hive metastore.
- */
-public abstract class HiveCatalogBase implements Catalog {
- private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class);
- private static final String DEFAULT_DB = "default";
-
- protected final String catalogName;
- protected final HiveConf hiveConf;
-
- private final String defaultDatabase;
- protected IMetaStoreClient client;
-
- public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
- this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
- }
-
- public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
- this(catalogName, DEFAULT_DB, hiveConf);
- }
-
- public HiveCatalogBase(String catalogName, String defaultDatabase, HiveConf hiveConf) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
- this.catalogName = catalogName;
- this.defaultDatabase = defaultDatabase;
- this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
- }
-
- private static HiveConf getHiveConf(String hiveMetastoreURI) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
-
- HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
- return hiveConf;
- }
-
- private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
- try {
- return RetryingMetaStoreClient.getProxy(
- hiveConf,
- null,
- null,
- HiveMetaStoreClient.class.getName(),
- true);
- } catch (MetaException e) {
- throw new CatalogException("Failed to create Hive metastore client", e);
- }
- }
-
- // ------ APIs ------
-
- /**
- * Validate input base table.
- *
- * @param catalogBaseTable the base table to be validated
- * @throws CatalogException thrown if the input base table is invalid.
- */
- protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable)
- throws CatalogException;
-
- /**
- * Create a CatalogBaseTable from a Hive table.
- *
- * @param hiveTable a Hive table
- * @return a CatalogBaseTable
- */
- protected abstract CatalogBaseTable createCatalogBaseTable(Table hiveTable);
-
- /**
- * Create a Hive table from a CatalogBaseTable.
- *
- * @param tablePath path of the table
- * @param table a CatalogBaseTable
- * @return a Hive table
- */
- protected abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table);
-
- /**
- * Create a CatalogDatabase from a Hive database.
- *
- * @param hiveDatabase a Hive database
- * @return a CatalogDatabase
- */
- protected abstract CatalogDatabase createCatalogDatabase(Database hiveDatabase);
-
- /**
- * Create a Hive database from a CatalogDatabase.
- *
- * @param databaseName name of the database
- * @param catalogDatabase a CatalogDatabase
- * @return a Hive database
- */
- protected abstract Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase);
-
- @Override
- public void open() throws CatalogException {
- if (client == null) {
- client = getMetastoreClient(hiveConf);
- LOG.info("Connected to Hive metastore");
- }
-
- if (!databaseExists(defaultDatabase)) {
- throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
- defaultDatabase, catalogName));
- }
- }
-
- @Override
- public void close() throws CatalogException {
- if (client != null) {
- client.close();
- client = null;
- LOG.info("Close connection to Hive metastore");
- }
- }
-
- // ------ databases ------
-
- @Override
- public String getDefaultDatabase() throws CatalogException {
- return defaultDatabase;
- }
-
- @Override
- public CatalogDatabase getDatabase(String databaseName)
- throws DatabaseNotExistException, CatalogException {
- try {
- return createCatalogDatabase(client.getDatabase(databaseName));
- } catch (NoSuchObjectException e) {
- throw new DatabaseNotExistException(catalogName, databaseName);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to get database %s from %s", databaseName, catalogName), e);
- }
- }
-
- @Override
- public List<String> listDatabases() throws CatalogException {
- try {
- return client.getAllDatabases();
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list all databases in %s", catalogName), e);
- }
- }
-
- @Override
- public boolean databaseExists(String databaseName) throws CatalogException {
- try {
- return client.getDatabase(databaseName) != null;
- } catch (NoSuchObjectException e) {
- return false;
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to determine whether database %s exists or not", databaseName), e);
- }
- }
-
- @Override
- public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
- try {
- client.dropDatabase(name, true, ignoreIfNotExists);
- } catch (NoSuchObjectException e) {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, name, e);
- }
- } catch (InvalidOperationException e) {
- throw new DatabaseNotEmptyException(catalogName, name);
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to drop database %s", name), e);
- }
- }
-
- @Override
- public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException, CatalogException {
- try {
- client.createDatabase(createHiveDatabase(name, database));
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(catalogName, name, e);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to create database %s", name), e);
- }
- }
-
- @Override
- public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
- throws DatabaseNotExistException, CatalogException {
- try {
- if (databaseExists(name)) {
- client.alterDatabase(name, createHiveDatabase(name, newDatabase));
- } else if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, name);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to alter database %s", name), e);
- }
- }
-
- // ------ tables ------
-
- @Override
- public CatalogBaseTable getTable(ObjectPath tablePath)
- throws TableNotExistException, CatalogException {
- return createCatalogBaseTable(getHiveTable(tablePath));
- }
-
- @Override
- public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
- throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
- validateCatalogBaseTable(table);
-
- if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
- } else {
- try {
- client.createTable(createHiveTable(tablePath, table));
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new TableAlreadyExistException(catalogName, tablePath, e);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
- }
- }
- }
-
- @Override
- public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException, CatalogException {
- try {
- // alter_table() doesn't throw a clear exception when target table doesn't exist.
- // Thus, check the table existence explicitly
- if (tableExists(tablePath)) {
- ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
- // alter_table() doesn't throw a clear exception when new table already exists.
- // Thus, check the table existence explicitly
- if (tableExists(newPath)) {
- throw new TableAlreadyExistException(catalogName, newPath);
- } else {
- Table table = getHiveTable(tablePath);
- table.setTableName(newTableName);
- client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
- }
- } else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
- }
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to rename table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- validateCatalogBaseTable(newCatalogTable);
-
- try {
- if (!tableExists(tablePath)) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath);
- }
- return;
- }
-
- Table oldTable = getHiveTable(tablePath);
- TableType oldTableType = TableType.valueOf(oldTable.getTableType());
-
- if (oldTableType == TableType.VIRTUAL_VIEW) {
- if (!(newCatalogTable instanceof CatalogView)) {
- throw new CatalogException(
- String.format("Table types don't match. The existing table is a view, but the new catalog base table is not."));
- }
- // Else, do nothing
- } else if ((oldTableType == TableType.MANAGED_TABLE)) {
- if (!(newCatalogTable instanceof CatalogTable)) {
- throw new CatalogException(
- String.format("Table types don't match. The existing table is a table, but the new catalog base table is not."));
- }
- // Else, do nothing
- } else {
- throw new CatalogException(
- String.format("Hive table type '%s' is not supported yet.",
- oldTableType.name()));
- }
-
- Table newTable = createHiveTable(tablePath, newCatalogTable);
-
- // client.alter_table() requires a valid location
- // thus, if new table doesn't have that, it reuses location of the old table
- if (!newTable.getSd().isSetLocation()) {
- newTable.getSd().setLocation(oldTable.getSd().getLocation());
- }
-
- client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to rename table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- try {
- client.dropTable(
- tablePath.getDatabaseName(),
- tablePath.getObjectName(),
- // Indicate whether associated data should be deleted.
- // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary
- true,
- ignoreIfNotExists);
- } catch (NoSuchObjectException e) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName, tablePath, e);
- }
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to drop table %s", tablePath.getFullName()), e);
- }
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws DatabaseNotExistException, CatalogException {
- try {
- return client.getAllTables(databaseName);
- } catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName, databaseName, e);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list tables in database %s", databaseName), e);
- }
- }
-
- @Override
- public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
- try {
- return client.getTables(
- databaseName,
- null, // table pattern
- TableType.VIRTUAL_VIEW);
- } catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName, databaseName, e);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list views in database %s", databaseName), e);
- }
- }
-
- @Override
- public boolean tableExists(ObjectPath tablePath) throws CatalogException {
- try {
- return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
- } catch (UnknownDBException e) {
- return false;
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), e);
- }
- }
-
- private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
- try {
- return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
- } catch (NoSuchObjectException e) {
- throw new TableNotExistException(catalogName, tablePath, e);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
- }
- }
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
index 45a6271..06ad4fc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
@@ -18,61 +18,46 @@
package org.apache.flink.table.catalog.hive;
-import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.AbstractCatalogDatabase;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A hive catalog database implementation.
*/
-public class HiveCatalogDatabase implements CatalogDatabase {
- // Property of the database
- private final Map<String, String> properties;
+public class HiveCatalogDatabase extends AbstractCatalogDatabase {
// HDFS path of the database
private final String location;
- // Comment of the database
- private final String comment;
public HiveCatalogDatabase(Map<String, String> properties, String comment) {
- this(properties, null, comment);
+ super(properties, comment);
+ location = null;
}
public HiveCatalogDatabase(Map<String, String> properties, String location, String comment) {
- this.properties = checkNotNull(properties, "properties cannot be null");
+ super(properties, comment);
this.location = location;
- this.comment = checkNotNull(comment, "comment cannot be null");
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Override
- public String getComment() {
- return comment;
}
@Override
public HiveCatalogDatabase copy() {
- return new HiveCatalogDatabase(new HashMap<>(properties), location, comment);
+ return new HiveCatalogDatabase(new HashMap<>(getProperties()), location, getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.of(comment);
+ return Optional.ofNullable(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
- return Optional.of("This is a Hive catalog database stored in memory only");
+ return Optional.of("This is a Hive catalog database");
}
public String getLocation() {
return location;
}
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
index 5a2fbff..ee3d14e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogTable;
import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,30 +28,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A Hive catalog table implementation.
*/
-public class HiveCatalogTable implements CatalogTable {
- // Schema of the table (column names and types)
- private final TableSchema tableSchema;
- // Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
- private final List<String> partitionKeys;
- // Properties of the table
- private final Map<String, String> properties;
- // Comment of the table
- private final String comment;
+public class HiveCatalogTable extends AbstractCatalogTable {
public HiveCatalogTable(
TableSchema tableSchema,
List<String> partitionKeys,
Map<String, String> properties,
String comment) {
- this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
- this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
- this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
+ super(tableSchema, partitionKeys, properties, comment);
}
public HiveCatalogTable(
@@ -62,44 +49,27 @@ public class HiveCatalogTable implements CatalogTable {
}
@Override
- public boolean isPartitioned() {
- return !partitionKeys.isEmpty();
- }
-
- @Override
- public List<String> getPartitionKeys() {
- return partitionKeys;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Override
- public TableSchema getSchema() {
- return tableSchema;
- }
-
- @Override
- public String getComment() {
- return comment;
- }
-
- @Override
public CatalogBaseTable copy() {
return new HiveCatalogTable(
- tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(properties), comment);
+ getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.ofNullable(comment);
+ return Optional.ofNullable(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
// TODO: return a detailed description
- return Optional.ofNullable(comment);
+ return Optional.ofNullable(getComment());
}
+
+ @Override
+ public Map<String, String> toProperties() {
+ // TODO: output properties that are used to auto-discover TableFactory for Hive tables.
+ Map<String, String> properties = new HashMap<>();
+ return properties;
+ }
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
index 9930709..927e6c7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
@@ -19,36 +19,17 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogView;
import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A Hive catalog view implementation.
*/
-public class HiveCatalogView implements CatalogView {
- // Original text of the view definition.
- private final String originalQuery;
-
- // Expanded text of the original view definition
- // This is needed because the context such as current DB is
- // lost after the session, in which view is defined, is gone.
- // Expanded query text takes care of the this, as an example.
- private final String expandedQuery;
-
- // Schema of the view (column names and types)
- private final TableSchema tableSchema;
- // Properties of the view
- private final Map<String, String> properties;
- // Comment of the view
- private final String comment;
+public class HiveCatalogView extends AbstractCatalogView {
public HiveCatalogView(
String originalQuery,
@@ -56,55 +37,24 @@ public class HiveCatalogView implements CatalogView {
TableSchema tableSchema,
Map<String, String> properties,
String comment) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
-
- this.originalQuery = originalQuery;
- this.expandedQuery = expandedQuery;
- this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
- this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
- }
-
- @Override
- public String getOriginalQuery() {
- return originalQuery;
- }
-
- @Override
- public String getExpandedQuery() {
- return expandedQuery;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Override
- public TableSchema getSchema() {
- return tableSchema;
- }
-
- @Override
- public String getComment() {
- return comment;
+ super(originalQuery, expandedQuery, tableSchema, properties, comment);
}
@Override
public CatalogBaseTable copy() {
return new HiveCatalogView(
- originalQuery, expandedQuery, tableSchema.copy(), new HashMap<>(properties), comment);
+ this.getOriginalQuery(), this.getExpandedQuery(), this.getSchema().copy(), new HashMap<>(this.getProperties()), getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.ofNullable(comment);
+ return Optional.ofNullable(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
// TODO: return a detailed description
- return Optional.ofNullable(comment);
+ return Optional.ofNullable(getComment());
}
+
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 24e3439..5c69f50 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -44,7 +44,7 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
@BeforeClass
public static void init() throws IOException {
- catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+ catalog = HiveTestUtils.createHiveCatalog();
catalog.open();
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 2618c8c..0d4367a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -34,13 +34,6 @@ public class HiveTestUtils {
private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
/**
- * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
- */
- public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException {
- return new GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
- }
-
- /**
* Create a HiveCatalog with an embedded Hive Metastore.
*/
public static HiveCatalog createHiveCatalog() throws IOException {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
similarity index 63%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
index dd2b1f0..424048e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogDatabase.java
@@ -18,24 +18,20 @@
package org.apache.flink.table.catalog;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog database implementation.
+ * An abstract class representing a database in a catalog.
*/
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogDatabase implements CatalogDatabase {
+ // Property of the database
private final Map<String, String> properties;
+ // Comment of the database
private final String comment;
- public GenericCatalogDatabase(Map<String, String> properties) {
- this(properties, null);
- }
-
- public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+ public AbstractCatalogDatabase(Map<String, String> properties, String comment) {
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
}
@@ -47,21 +43,7 @@ public class GenericCatalogDatabase implements CatalogDatabase {
@Override
public String getComment() {
- return this.comment;
- }
-
- @Override
- public GenericCatalogDatabase copy() {
- return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+ return comment;
}
- @Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog database stored in memory only");
- }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
similarity index 58%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
index dd2b1f0..f4046b0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogFunction.java
@@ -18,50 +18,35 @@
package org.apache.flink.table.catalog;
-import java.util.HashMap;
+import org.apache.flink.util.StringUtils;
+
import java.util.Map;
-import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog database implementation.
+ * An abstract catalog function implementation.
*/
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogFunction implements CatalogFunction {
+ private final String className; // Fully qualified class name of the function
private final Map<String, String> properties;
- private final String comment;
- public GenericCatalogDatabase(Map<String, String> properties) {
- this(properties, null);
- }
+ public AbstractCatalogFunction(String className, Map<String, String> properties) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(className), "className cannot be null or empty");
- public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+ this.className = className;
this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
}
@Override
- public String getComment() {
- return this.comment;
+ public String getClassName() {
+ return this.className;
}
@Override
- public GenericCatalogDatabase copy() {
- return new GenericCatalogDatabase(new HashMap<>(properties), comment);
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
+ public Map<String, String> getProperties() {
+ return this.properties;
}
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog database stored in memory only");
- }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
similarity index 62%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
index dd2b1f0..818d72a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogPartition.java
@@ -18,24 +18,18 @@
package org.apache.flink.table.catalog;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog database implementation.
+ * An abstract catalog partition implementation.
*/
-public class GenericCatalogDatabase implements CatalogDatabase {
+public abstract class AbstractCatalogPartition implements CatalogPartition {
private final Map<String, String> properties;
private final String comment;
- public GenericCatalogDatabase(Map<String, String> properties) {
- this(properties, null);
- }
-
- public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+ public AbstractCatalogPartition(Map<String, String> properties, String comment) {
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
}
@@ -45,23 +39,8 @@ public class GenericCatalogDatabase implements CatalogDatabase {
return properties;
}
- @Override
public String getComment() {
- return this.comment;
- }
-
- @Override
- public GenericCatalogDatabase copy() {
- return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+ return comment;
}
- @Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog database stored in memory only");
- }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
similarity index 72%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index eff8305..c65ca73 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -21,17 +21,15 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog table implementation.
+ * An abstract catalog table.
*/
-public class GenericCatalogTable implements CatalogTable {
+public abstract class AbstractCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final TableSchema tableSchema;
// Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
@@ -41,7 +39,14 @@ public class GenericCatalogTable implements CatalogTable {
// Comment of the table
private final String comment;
- public GenericCatalogTable(
+ public AbstractCatalogTable(
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
+ this(tableSchema, new ArrayList<>(), properties, comment);
+ }
+
+ public AbstractCatalogTable(
TableSchema tableSchema,
List<String> partitionKeys,
Map<String, String> properties,
@@ -52,13 +57,6 @@ public class GenericCatalogTable implements CatalogTable {
this.comment = comment;
}
- public GenericCatalogTable(
- TableSchema tableSchema,
- Map<String, String> properties,
- String description) {
- this(tableSchema, new ArrayList<>(), properties, description);
- }
-
@Override
public boolean isPartitioned() {
return !partitionKeys.isEmpty();
@@ -76,7 +74,7 @@ public class GenericCatalogTable implements CatalogTable {
@Override
public TableSchema getSchema() {
- return this.tableSchema;
+ return tableSchema;
}
@Override
@@ -84,20 +82,4 @@ public class GenericCatalogTable implements CatalogTable {
return comment;
}
- @Override
- public GenericCatalogTable copy() {
- return new GenericCatalogTable(
- this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a catalog table in an im-memory catalog");
- }
-
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
similarity index 65%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
index 2a8a184..9bde0a2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogView.java
@@ -21,17 +21,15 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.util.StringUtils;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog view implementation.
+ * An abstract catalog view.
*/
-public class GenericCatalogView implements CatalogView {
+public abstract class AbstractCatalogView implements CatalogView {
// Original text of the view definition.
private final String originalQuery;
@@ -41,25 +39,18 @@ public class GenericCatalogView implements CatalogView {
// Expanded query text takes care of the this, as an example.
private final String expandedQuery;
- // Schema of the view (column names and types)
- private final TableSchema tableSchema;
- // Properties of the view
+ private final TableSchema schema;
private final Map<String, String> properties;
- // Comment of the view
private final String comment;
- public GenericCatalogView(
- String originalQuery,
- String expandedQuery,
- TableSchema tableSchema,
- Map<String, String> properties,
- String comment) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
+ public AbstractCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
+ Map<String, String> properties, String comment) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "originalQuery cannot be null or empty");
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expandedQuery cannot be null or empty");
this.originalQuery = originalQuery;
this.expandedQuery = expandedQuery;
- this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
+ this.schema = checkNotNull(schema, "schema cannot be null");
this.properties = checkNotNull(properties, "properties cannot be null");
this.comment = comment;
}
@@ -76,32 +67,16 @@ public class GenericCatalogView implements CatalogView {
@Override
public Map<String, String> getProperties() {
- return properties;
+ return this.properties;
}
@Override
public TableSchema getSchema() {
- return tableSchema;
+ return this.schema;
}
- @Override
public String getComment() {
- return comment;
+ return this.comment;
}
- @Override
- public GenericCatalogView copy() {
- return new GenericCatalogView(this.originalQuery, this.expandedQuery, tableSchema.copy(),
- new HashMap<>(this.properties), comment);
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(comment);
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a catalog view in an im-memory catalog");
- }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index dd2b1f0..44cf934 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -22,46 +22,29 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A generic catalog database implementation.
*/
-public class GenericCatalogDatabase implements CatalogDatabase {
- private final Map<String, String> properties;
- private final String comment;
-
- public GenericCatalogDatabase(Map<String, String> properties) {
- this(properties, null);
- }
+public class GenericCatalogDatabase extends AbstractCatalogDatabase {
public GenericCatalogDatabase(Map<String, String> properties, String comment) {
- this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Override
- public String getComment() {
- return this.comment;
+ super(properties, comment);
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
}
@Override
public GenericCatalogDatabase copy() {
- return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+ return new GenericCatalogDatabase(new HashMap<>(getProperties()), getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.of(comment);
+ return Optional.of(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog database stored in memory only");
+ return Optional.of("This is a generic catalog database");
}
+
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
index 5c0176d..fbfe271 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -25,33 +25,24 @@ import java.util.Optional;
/**
* A generic catalog function implementation.
*/
-public class GenericCatalogFunction implements CatalogFunction {
-
- private final String className; // Fully qualified class name of the function
- private final Map<String, String> properties;
-
- public GenericCatalogFunction(String className) {
- this(className, new HashMap<>());
- }
+public class GenericCatalogFunction extends AbstractCatalogFunction {
public GenericCatalogFunction(String className, Map<String, String> properties) {
- this.className = className;
- this.properties = properties;
+ super(className, properties);
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
}
@Override
- public String getClassName() {
- return this.className;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return this.properties;
+ public GenericCatalogFunction copy() {
+ return new GenericCatalogFunction(getClassName(), new HashMap<>(getProperties()));
}
@Override
- public GenericCatalogFunction copy() {
- return new GenericCatalogFunction(className, new HashMap<>(properties));
+ public String toString() {
+ return "GenericCatalogFunction{" +
+ ", className='" + getClassName() + '\'' +
+ ", properties=" + getProperties() +
+ '}';
}
@Override
@@ -61,15 +52,7 @@ public class GenericCatalogFunction implements CatalogFunction {
@Override
public Optional<String> getDetailedDescription() {
- return Optional.of("This is a user-defined function in an in-memory catalog implementation");
- }
-
- @Override
- public String toString() {
- return "GenericCatalogFunction{" +
- ", className='" + className + '\'' +
- ", properties=" + properties +
- '}';
+ return Optional.of("This is a user-defined function");
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
index 085278e..37bc5e1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
@@ -25,37 +25,26 @@ import java.util.Optional;
/**
* A generic catalog partition implementation.
*/
-public class GenericCatalogPartition implements CatalogPartition {
- private final Map<String, String> properties;
-
- private String comment = "This is a generic catalog partition";
-
- public GenericCatalogPartition(Map<String, String> properties) {
- this.properties = properties;
- }
+public class GenericCatalogPartition extends AbstractCatalogPartition {
public GenericCatalogPartition(Map<String, String> properties, String comment) {
- this(properties);
- this.comment = comment;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
+ super(properties, comment);
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
}
@Override
public CatalogPartition copy() {
- return new GenericCatalogPartition(new HashMap<>(properties));
+ return new GenericCatalogPartition(new HashMap<>(getProperties()), getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.of(comment);
+ return Optional.of(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
return Optional.of("This is a generic catalog partition with detailed description");
}
+
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index eff8305..0eff977 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -26,73 +26,43 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A generic catalog table implementation.
*/
-public class GenericCatalogTable implements CatalogTable {
- // Schema of the table (column names and types)
- private final TableSchema tableSchema;
- // Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned
- private final List<String> partitionKeys;
- // Properties of the table
- private final Map<String, String> properties;
- // Comment of the table
- private final String comment;
+public class GenericCatalogTable extends AbstractCatalogTable {
public GenericCatalogTable(
- TableSchema tableSchema,
- List<String> partitionKeys,
- Map<String, String> properties,
- String comment) {
- this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
- this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
- this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
+ this(tableSchema, new ArrayList<>(), properties, comment);
}
public GenericCatalogTable(
TableSchema tableSchema,
+ List<String> partitionKeys,
Map<String, String> properties,
- String description) {
- this(tableSchema, new ArrayList<>(), properties, description);
- }
-
- @Override
- public boolean isPartitioned() {
- return !partitionKeys.isEmpty();
+ String comment) {
+ super(tableSchema, partitionKeys, properties, comment);
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
}
@Override
- public List<String> getPartitionKeys() {
- return partitionKeys;
+ public GenericCatalogTable copy() {
+ return new GenericCatalogTable(
+ getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
}
@Override
- public Map<String, String> getProperties() {
+ public Map<String, String> toProperties() {
+ // TODO: Filter out ANY properties that are not needed for table discovery.
+ Map<String, String> properties = new HashMap<>();
return properties;
}
@Override
- public TableSchema getSchema() {
- return this.tableSchema;
- }
-
- @Override
- public String getComment() {
- return comment;
- }
-
- @Override
- public GenericCatalogTable copy() {
- return new GenericCatalogTable(
- this.tableSchema.copy(), new ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
- }
-
- @Override
public Optional<String> getDescription() {
- return Optional.of(comment);
+ return Optional.of(getComment());
}
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
index 2a8a184..96e019f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
@@ -19,89 +19,36 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A generic catalog view implementation.
*/
-public class GenericCatalogView implements CatalogView {
- // Original text of the view definition.
- private final String originalQuery;
-
- // Expanded text of the original view definition
- // This is needed because the context such as current DB is
- // lost after the session, in which view is defined, is gone.
- // Expanded query text takes care of the this, as an example.
- private final String expandedQuery;
-
- // Schema of the view (column names and types)
- private final TableSchema tableSchema;
- // Properties of the view
- private final Map<String, String> properties;
- // Comment of the view
- private final String comment;
-
- public GenericCatalogView(
- String originalQuery,
- String expandedQuery,
- TableSchema tableSchema,
- Map<String, String> properties,
- String comment) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(originalQuery), "original query cannot be null or empty");
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(expandedQuery), "expanded query cannot be null or empty");
-
- this.originalQuery = originalQuery;
- this.expandedQuery = expandedQuery;
- this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
- this.properties = checkNotNull(properties, "properties cannot be null");
- this.comment = comment;
- }
-
- @Override
- public String getOriginalQuery() {
- return this.originalQuery;
- }
-
- @Override
- public String getExpandedQuery() {
- return this.expandedQuery;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return properties;
- }
+public class GenericCatalogView extends AbstractCatalogView {
- @Override
- public TableSchema getSchema() {
- return tableSchema;
- }
-
- @Override
- public String getComment() {
- return comment;
+ public GenericCatalogView(String originalQuery, String expandedQuery, TableSchema schema,
+ Map<String, String> properties, String comment) {
+ super(originalQuery, expandedQuery, schema, properties, comment);
+ properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
}
@Override
public GenericCatalogView copy() {
- return new GenericCatalogView(this.originalQuery, this.expandedQuery, tableSchema.copy(),
- new HashMap<>(this.properties), comment);
+ return new GenericCatalogView(getOriginalQuery(), getExpandedQuery(), getSchema().copy(),
+ new HashMap<>(getProperties()), getComment());
}
@Override
public Optional<String> getDescription() {
- return Optional.of(comment);
+ return Optional.of(getComment());
}
@Override
public Optional<String> getDetailedDescription() {
return Optional.of("This is a catalog view in an im-memory catalog");
}
+
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 6a1f5df..801aabb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -48,6 +48,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A generic catalog implementation that holds all meta objects in memory.
*/
public class GenericInMemoryCatalog implements Catalog {
+ public static final String FLINK_IS_GENERIC_KEY = "is_generic";
+ public static final String FLINK_IS_GENERIC_VALUE = "true";
+
private static final String DEFAULT_DB = "default";
private final String defaultDatabase;
@@ -74,7 +77,7 @@ public class GenericInMemoryCatalog implements Catalog {
this.catalogName = name;
this.defaultDatabase = defaultDatabase;
this.databases = new LinkedHashMap<>();
- this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>()));
+ this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>(), ""));
this.tables = new LinkedHashMap<>();
this.functions = new LinkedHashMap<>();
this.partitions = new LinkedHashMap<>();
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index a013aed..f3d144f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -741,15 +741,15 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
private CatalogPartition createPartition() {
- return new GenericCatalogPartition(getBatchTableProperties());
+ return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
}
private CatalogPartition createAnotherPartition() {
- return new GenericCatalogPartition(getBatchTableProperties());
+ return new GenericCatalogPartition(getBatchTableProperties(), "Generic batch table");
}
private CatalogPartition createPartition(Map<String, String> props) {
- return new GenericCatalogPartition(props);
+ return new GenericCatalogPartition(props, "Generic catalog table");
}
@Override
@@ -791,11 +791,11 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
}
protected CatalogFunction createFunction() {
- return new GenericCatalogFunction(MyScalarFunction.class.getName());
+ return new GenericCatalogFunction(MyScalarFunction.class.getName(), new HashMap<>());
}
protected CatalogFunction createAnotherFunction() {
- return new GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+ return new GenericCatalogFunction(MyOtherScalarFunction.class.getName(), new HashMap<>());
}
/**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index e38e930..46e327e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog;
import java.util.List;
+import java.util.Map;
/**
* Represents a table in a catalog.
@@ -37,4 +38,12 @@ public interface CatalogTable extends CatalogBaseTable {
* @return partition keys of the table
*/
List<String> getPartitionKeys();
+
+ /**
+ * Return a property map for table factory discovery purpose. The properties will be used to match a [[TableFactory]].
+ * Please refer to {@link org.apache.flink.table.factories.TableFactory}
+ *
+ * @return a map of properties
+ */
+ Map<String, String> toProperties();
}