You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/06 21:53:27 UTC
[flink] branch master updated: [FLINK-12232][hive] Support database
related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a3cf3f1 [FLINK-12232][hive] Support database related operations in HiveCatalog
a3cf3f1 is described below
commit a3cf3f1fc69e9ed17433cbc522c7086cead7790e
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu May 2 21:05:04 2019 -0700
[FLINK-12232][hive] Support database related operations in HiveCatalog
This PR creates HiveCatalog in flink-connector-hive module and implements database related APIs for HiveCatalog.
This closes #8837
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 162 ++--------------
.../flink/table/catalog/hive/HiveCatalog.java | 210 +++++++++++++++++++++
.../flink/table/catalog/hive/HiveCatalogBase.java | 200 ++++++++++++++++++++
.../table/catalog/hive/HiveCatalogDatabase.java | 51 +++--
.../flink/table/catalog/hive/HiveCatalogUtil.java | 45 ++---
.../hive/GenericHiveMetastoreCatalogTest.java | 2 +-
.../flink/table/catalog/hive/HiveCatalogTest.java | 179 ++++++++++++++++++
.../table/catalog/GenericCatalogDatabase.java | 20 +-
.../flink/table/catalog/CatalogDatabase.java | 7 +
.../flink/table/catalog/CatalogTestBase.java | 5 +
10 files changed, 679 insertions(+), 202 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 50ed2e9..bb431cc 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -25,10 +25,8 @@ import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ReadableWritableCatalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -38,16 +36,10 @@ import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
@@ -57,173 +49,43 @@ import org.slf4j.LoggerFactory;
import java.util.List;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage.
*/
-public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
- public static final String DEFAULT_DB = "default";
-
- private final String catalogName;
- private final HiveConf hiveConf;
-
- private String currentDatabase = DEFAULT_DB;
- private IMetaStoreClient client;
-
public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) {
- this(catalogName, getHiveConf(hivemetastoreURI));
- }
+ super(catalogName, hivemetastoreURI);
- public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
- this.catalogName = catalogName;
-
- this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
}
- private static HiveConf getHiveConf(String hiveMetastoreURI) {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
-
- HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
- return hiveConf;
- }
-
- private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
- try {
- return RetryingMetaStoreClient.getProxy(
- hiveConf,
- null,
- null,
- HiveMetaStoreClient.class.getName(),
- true);
- } catch (MetaException e) {
- throw new CatalogException("Failed to create Hive metastore client", e);
- }
- }
-
- @Override
- public void open() throws CatalogException {
- if (client == null) {
- client = getMetastoreClient(hiveConf);
- LOG.info("Connected to Hive metastore");
- }
- }
+ public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
+ super(catalogName, hiveConf);
- @Override
- public void close() throws CatalogException {
- if (client != null) {
- client.close();
- client = null;
- LOG.info("Close connection to Hive metastore");
- }
+ LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
}
// ------ databases ------
@Override
- public String getCurrentDatabase() throws CatalogException {
- return currentDatabase;
- }
-
- @Override
- public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(catalogName, databaseName);
- }
-
- currentDatabase = databaseName;
- }
-
- @Override
- public List<String> listDatabases() throws CatalogException {
- try {
- return client.getAllDatabases();
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list all databases in %s", catalogName), e);
- }
- }
-
- @Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
- Database hiveDb;
-
- try {
- hiveDb = client.getDatabase(databaseName);
- } catch (NoSuchObjectException e) {
- throw new DatabaseNotExistException(catalogName, databaseName);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to get database %s from %s", databaseName, catalogName), e);
- }
+ Database hiveDb = getHiveDatabase(databaseName);
return new GenericCatalogDatabase(hiveDb.getParameters(), hiveDb.getDescription());
}
@Override
- public boolean databaseExists(String databaseName) throws CatalogException {
- try {
- return client.getDatabase(databaseName) != null;
- } catch (NoSuchObjectException e) {
- return false;
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to determine whether database %s exists or not", databaseName), e);
- }
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
}
@Override
- public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
-
- try {
- client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database));
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(catalogName, name);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to create database %s", name), e);
- }
- }
-
- @Override
- public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
- try {
- client.dropDatabase(name, true, ignoreIfNotExists);
- } catch (NoSuchObjectException e) {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, name);
- }
- } catch (InvalidOperationException e) {
- if (e.getMessage().startsWith(String.format("Database %s is not empty", name))) {
- throw new DatabaseNotEmptyException(catalogName, name);
- } else {
- throw new CatalogException(String.format("Failed to drop database %s", name), e);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to drop database %s", name), e);
- }
- }
-
- @Override
- public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
- try {
- if (databaseExists(name)) {
- client.alterDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase));
- } else if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(catalogName, name);
- }
- } catch (TException e) {
- throw new CatalogException(String.format("Failed to alter database %s", name), e);
- }
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ alterHiveDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
}
// ------ tables and views------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
new file mode 100644
index 0000000..7f96852
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
+
+ public HiveCatalog(String catalogName, String hivemetastoreURI) {
+ super(catalogName, hivemetastoreURI);
+
+ LOG.info("Created HiveCatalog '{}'", catalogName);
+ }
+
+ public HiveCatalog(String catalogName, HiveConf hiveConf) {
+ super(catalogName, hiveConf);
+
+ LOG.info("Created HiveCatalog '{}'", catalogName);
+ }
+
+ // ------ databases ------
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Database hiveDb = getHiveDatabase(databaseName);
+
+ return new HiveCatalogDatabase(
+ hiveDb.getParameters(), hiveDb.getLocationUri(), hiveDb.getDescription());
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ createHiveDatabase(HiveCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+ alterHiveDatabase(name, HiveCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
+ }
+
+ // ------ tables and views------
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ partitions ------
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------ functions ------
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
new file mode 100644
index 0000000..b044de3
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for catalogs backed by Hive metastore.
+ */
+public abstract class HiveCatalogBase implements ReadableWritableCatalog {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class);
+
+ public static final String DEFAULT_DB = "default";
+
+ protected final String catalogName;
+ protected final HiveConf hiveConf;
+
+ protected String currentDatabase = DEFAULT_DB;
+ protected IMetaStoreClient client;
+
+ public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
+ this(catalogName, getHiveConf(hivemetastoreURI));
+ }
+
+ public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+ this.catalogName = catalogName;
+
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
+ }
+
+ private static HiveConf getHiveConf(String hiveMetastoreURI) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
+ return hiveConf;
+ }
+
+ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+ try {
+ return RetryingMetaStoreClient.getProxy(
+ hiveConf,
+ null,
+ null,
+ HiveMetaStoreClient.class.getName(),
+ true);
+ } catch (MetaException e) {
+ throw new CatalogException("Failed to create Hive metastore client", e);
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (client == null) {
+ client = getMetastoreClient(hiveConf);
+ LOG.info("Connected to Hive metastore");
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Close connection to Hive metastore");
+ }
+ }
+
+ // ------ databases ------
+
+ @Override
+ public String getCurrentDatabase() throws CatalogException {
+ return currentDatabase;
+ }
+
+ @Override
+ public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+
+ currentDatabase = databaseName;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list all databases in %s", catalogName), e);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ try {
+ return client.getDatabase(databaseName) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to determine whether database %s exists or not", databaseName), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ try {
+ client.dropDatabase(name, true, ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName, name);
+ }
+ } catch (InvalidOperationException e) {
+ throw new DatabaseNotEmptyException(catalogName, name);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to drop database %s", name), e);
+ }
+ }
+
+ protected Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+ try {
+ return client.getDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+ }
+ }
+
+ protected void createHiveDatabase(Database hiveDatabase, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ try {
+ client.createDatabase(hiveDatabase);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
+ }
+ }
+
+ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ if (databaseExists(name)) {
+ client.alterDatabase(name, newHiveDatabase);
+ } else if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName, name);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to alter database %s", name), e);
+ }
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
similarity index 55%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
index 959247a..a7cf5c2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
@@ -16,38 +16,62 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A generic catalog database implementation.
+ * A hive catalog database implementation.
*/
-public class GenericCatalogDatabase implements CatalogDatabase {
+public class HiveCatalogDatabase implements CatalogDatabase {
+ // Property of the database
private final Map<String, String> properties;
+ // HDFS path of the database
+ private String location;
// Comment of the database
- private String comment = "This is a generic catalog database.";
+ private String comment = "This is a hive catalog database.";
+
+ public HiveCatalogDatabase() {
+ properties = new HashMap<>();
+ }
- public GenericCatalogDatabase(Map<String, String> properties) {
+ public HiveCatalogDatabase(Map<String, String> properties) {
this.properties = checkNotNull(properties, "properties cannot be null");
}
- public GenericCatalogDatabase(Map<String, String> properties, String comment) {
+ public HiveCatalogDatabase(Map<String, String> properties, String comment) {
this(properties);
this.comment = checkNotNull(comment, "comment cannot be null");
}
+ public HiveCatalogDatabase(Map<String, String> properties, String location, String comment) {
+ this(properties, comment);
+
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(location), "location cannot be null or empty");
+ this.location = location;
+ }
+
+ @Override
public Map<String, String> getProperties() {
return properties;
}
@Override
- public GenericCatalogDatabase copy() {
- return new GenericCatalogDatabase(new HashMap<>(properties), comment);
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public HiveCatalogDatabase copy() {
+ return new HiveCatalogDatabase(new HashMap<>(properties), location, comment);
}
@Override
@@ -57,15 +81,10 @@ public class GenericCatalogDatabase implements CatalogDatabase {
@Override
public Optional<String> getDetailedDescription() {
- return Optional.of("This is a generic catalog database stored in memory only");
+ return Optional.of("This is a Hive catalog database stored in memory only");
}
- public String getComment() {
- return this.comment;
+ public String getLocation() {
+ return location;
}
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
similarity index 53%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
index 89233da..e972ba4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
@@ -16,38 +16,33 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.hive;
-import java.util.Map;
-import java.util.Optional;
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
/**
- * Represents a database object in a catalog.
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
*/
-public interface CatalogDatabase {
- /**
- * Get a map of properties associated with the database.
- */
- Map<String, String> getProperties();
+public class HiveCatalogUtil {
- /**
- * Get a deep copy of the CatalogDatabase instance.
- *
- * @return a copy of CatalogDatabase instance
- */
- CatalogDatabase copy();
+ private HiveCatalogUtil() {
+ }
- /**
- * Get a brief description of the database.
- *
- * @return an optional short description of the database
- */
- Optional<String> getDescription();
+ // ------ Utils ------
/**
- * Get a detailed description of the database.
- *
- * @return an optional long description of the database
+ * Creates a Hive database from CatalogDatabase.
*/
- Optional<String> getDetailedDescription();
+ public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
+ HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db;
+
+ return new Database(
+ dbName,
+ db.getComment(),
+ hiveCatalogDatabase.getLocation(),
+ hiveCatalogDatabase.getProperties());
+
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 2687699..76f8e08 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -95,7 +95,7 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
@Override
public String getBuiltInDefaultDatabase() {
- return GenericHiveMetastoreCatalog.DEFAULT_DB;
+ return HiveCatalogBase.DEFAULT_DB;
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
new file mode 100644
index 0000000..7b132c9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for HiveCatalog.
+ */
+public class HiveCatalogTest extends CatalogTestBase {
+ @BeforeClass
+ public static void init() throws IOException {
+ catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+ catalog.open();
+ }
+
+ // =====================
+ // HiveCatalog doesn't support table operation yet
+ // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against HiveCatalog
+ // =====================
+
+ // TODO: re-enable these tests once HiveCatalog support table operations
+ @Test
+ public void testDropDb_DatabaseNotEmptyException() throws Exception {
+ }
+
+ @Test
+ public void testCreateTable_Streaming() throws Exception {
+ }
+
+ @Test
+ public void testCreateTable_Batch() throws Exception {
+ }
+
+ @Test
+ public void testCreateTable_DatabaseNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExistException() throws Exception {
+ }
+
+ @Test
+ public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+ }
+
+ @Test
+ public void testGetTable_TableNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testGetTable_TableNotExistException_NoDb() throws Exception {
+ }
+
+ @Test
+ public void testDropTable_nonPartitionedTable() throws Exception {
+ }
+
+ @Test
+ public void testDropTable_TableNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testDropTable_TableNotExist_ignored() throws Exception {
+ }
+
+ @Test
+ public void testAlterTable() throws Exception {
+ }
+
+ @Test
+ public void testAlterTable_TableNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testAlterTable_TableNotExist_ignored() throws Exception {
+ }
+
+ @Test
+ public void testRenameTable_nonPartitionedTable() throws Exception {
+ }
+
+ @Test
+ public void testRenameTable_TableNotExistException() throws Exception {
+ }
+
+ @Test
+ public void testRenameTable_TableNotExistException_ignored() throws Exception {
+ }
+
+ @Test
+ public void testRenameTable_TableAlreadyExistException() throws Exception {
+ }
+
+ @Test
+ public void testTableExists() throws Exception {
+ }
+
+ // ------ utils ------
+
+ @Override
+ public String getBuiltInDefaultDatabase() {
+ return HiveCatalogBase.DEFAULT_DB;
+ }
+
+ @Override
+ public CatalogDatabase createDb() {
+ return new HiveCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k1", "v1");
+ }},
+ TEST_COMMENT
+ );
+ }
+
+ @Override
+ public CatalogDatabase createAnotherDb() {
+ return new HiveCatalogDatabase(
+ new HashMap<String, String>() {{
+ put("k2", "v2");
+ }},
+ TEST_COMMENT
+ );
+ }
+
+ @Override
+ public CatalogTable createTable() {
+ // TODO: implement this once HiveCatalog support table operations
+ return null;
+ }
+
+ @Override
+ public CatalogTable createAnotherTable() {
+ // TODO: implement this once HiveCatalog support table operations
+ return null;
+ }
+
+ @Override
+ public CatalogTable createStreamingTable() {
+ // TODO: implement this once HiveCatalog support table operations
+ return null;
+ }
+
+ @Override
+ public CatalogTable createPartitionedTable() {
+ // TODO: implement this once HiveCatalog support table operations
+ return null;
+ }
+
+ @Override
+ public CatalogTable createAnotherPartitionedTable() {
+ // TODO: implement this once HiveCatalog support table operations
+ return null;
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index 959247a..d5df274 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -29,9 +29,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class GenericCatalogDatabase implements CatalogDatabase {
private final Map<String, String> properties;
- // Comment of the database
private String comment = "This is a generic catalog database.";
+ public GenericCatalogDatabase() {
+ this.properties = new HashMap<>();
+ }
+
public GenericCatalogDatabase(Map<String, String> properties) {
this.properties = checkNotNull(properties, "properties cannot be null");
}
@@ -41,11 +44,17 @@ public class GenericCatalogDatabase implements CatalogDatabase {
this.comment = checkNotNull(comment, "comment cannot be null");
}
+ @Override
public Map<String, String> getProperties() {
return properties;
}
@Override
+ public String getComment() {
+ return this.comment;
+ }
+
+ @Override
public GenericCatalogDatabase copy() {
return new GenericCatalogDatabase(new HashMap<>(properties), comment);
}
@@ -59,13 +68,4 @@ public class GenericCatalogDatabase implements CatalogDatabase {
public Optional<String> getDetailedDescription() {
return Optional.of("This is a generic catalog database stored in memory only");
}
-
- public String getComment() {
- return this.comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
index 89233da..8b14ff8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
@@ -31,6 +31,13 @@ public interface CatalogDatabase {
Map<String, String> getProperties();
/**
+ * Get comment of the database.
+ *
+ * @return comment of the database
+ */
+ String getComment();
+
+ /**
* Get a deep copy of the CatalogDatabase instance.
*
* @return a copy of CatalogDatabase instance
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 8117c82..470a9a9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -109,11 +109,16 @@ public abstract class CatalogTestBase {
@Test
public void testSetCurrentDatabase() throws Exception {
assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
+
catalog.createDatabase(db2, createDb(), true);
catalog.setCurrentDatabase(db2);
+
assertEquals(db2, catalog.getCurrentDatabase());
+
catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
+
assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
+
catalog.dropDatabase(db2, false);
}