You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/14 23:41:17 UTC

[flink] branch master updated: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f7ecfdb  [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
f7ecfdb is described below

commit f7ecfdbcdf0f1ab17097ebbe1d170b984961bf60
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon May 13 16:07:21 2019 -0700

    [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
    
    Currently each subclass of HiveCatalogBase has its own impl for catalog related database APIs and its own util class to support such impl. However, they share many common code. This PR unifies their common parts to make the code cleaner and more readable.
    
    This closes #8433.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 27 +++++------
 .../flink/table/catalog/hive/HiveCatalog.java      | 26 +++++-----
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 56 +++++++++++++++-------
 .../flink/table/catalog/hive/HiveCatalogUtil.java  | 47 ------------------
 .../hive/util/GenericHiveMetastoreCatalogUtil.java | 49 -------------------
 5 files changed, 63 insertions(+), 142 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index fa0b86a..8558576 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -40,7 +39,6 @@ import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
@@ -90,22 +88,21 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 	// ------ databases ------
 
 	@Override
-	public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
-		Database hiveDb = getHiveDatabase(databaseName);
-
-		return new GenericCatalogDatabase(hiveDb.getParameters(), hiveDb.getDescription());
-	}
-
-	@Override
-	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
-			throws DatabaseAlreadyExistException, CatalogException {
-		createHiveDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
+	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
+		return new GenericCatalogDatabase(
+			hiveDatabase.getParameters(),
+			hiveDatabase.getDescription()
+		);
 	}
 
 	@Override
-	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
-			throws DatabaseNotExistException, CatalogException {
-		alterHiveDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
+	protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
+		return new Database(
+			databaseName,
+			catalogDatabase.getComment(),
+			// HDFS location URI which GenericCatalogDatabase shouldn't care
+			null,
+			catalogDatabase.getProperties());
 	}
 
 	// ------ tables and views------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 5ec6fd8..0619aa3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -76,23 +75,22 @@ public class HiveCatalog extends HiveCatalogBase {
 	// ------ databases ------
 
 	@Override
-	public CatalogDatabase getDatabase(String databaseName)
-			throws DatabaseNotExistException, CatalogException {
-		Database hiveDb = getHiveDatabase(databaseName);
-
+	protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
 		return new HiveCatalogDatabase(
-			hiveDb.getParameters(), hiveDb.getLocationUri(), hiveDb.getDescription());
-	}
-
-	@Override
-	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
-			throws DatabaseAlreadyExistException, CatalogException {
-		createHiveDatabase(HiveCatalogUtil.createHiveDatabase(name, database), ignoreIfExists);
+			hiveDatabase.getParameters(),
+			hiveDatabase.getLocationUri(),
+			hiveDatabase.getDescription());
 	}
 
 	@Override
-	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
-		alterHiveDatabase(name, HiveCatalogUtil.createHiveDatabase(name, newDatabase), ignoreIfNotExists);
+	protected Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
+		HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) catalogDatabase;
+
+		return new Database(
+			databaseName,
+			catalogDatabase.getComment(),
+			hiveCatalogDatabase.getLocation(),
+			hiveCatalogDatabase.getProperties());
 	}
 
 	// ------ tables and views------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index f4eecf1..27ca7a1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -128,6 +129,23 @@ public abstract class HiveCatalogBase implements Catalog {
 	 */
 	protected abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table);
 
+	/**
+	 * Create a CatalogDatabase from a Hive database.
+	 *
+	 * @param hiveDatabase a Hive database
+	 * @return a CatalogDatabase
+	 */
+	protected abstract CatalogDatabase createCatalogDatabase(Database hiveDatabase);
+
+	/**
+	 * Create a Hive database from a CatalogDatabase.
+	 *
+	 * @param databaseName name of the database
+	 * @param catalogDatabase a CatalogDatabase
+	 * @return a Hive database
+	 */
+	protected abstract Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase);
+
 	@Override
 	public void open() throws CatalogException {
 		if (client == null) {
@@ -158,6 +176,19 @@ public abstract class HiveCatalogBase implements Catalog {
 	}
 
 	@Override
+	public CatalogDatabase getDatabase(String databaseName)
+			throws DatabaseNotExistException, CatalogException {
+		try {
+			return createCatalogDatabase(client.getDatabase(databaseName));
+		} catch (NoSuchObjectException e) {
+			throw new DatabaseNotExistException(catalogName, databaseName);
+		} catch (TException e) {
+			throw new CatalogException(
+				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
+		}
+	}
+
+	@Override
 	public List<String> listDatabases() throws CatalogException {
 		try {
 			return client.getAllDatabases();
@@ -194,35 +225,26 @@ public abstract class HiveCatalogBase implements Catalog {
 		}
 	}
 
-	protected Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
-		try {
-			return client.getDatabase(databaseName);
-		} catch (NoSuchObjectException e) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		} catch (TException e) {
-			throw new CatalogException(
-				String.format("Failed to get database %s from %s", databaseName, catalogName), e);
-		}
-	}
-
-	protected void createHiveDatabase(Database hiveDatabase, boolean ignoreIfExists)
+	@Override
+	public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
 			throws DatabaseAlreadyExistException, CatalogException {
 		try {
-			client.createDatabase(hiveDatabase);
+			client.createDatabase(createHiveDatabase(name, database));
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
-				throw new DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+				throw new DatabaseAlreadyExistException(catalogName, name);
 			}
 		} catch (TException e) {
-			throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
+			throw new CatalogException(String.format("Failed to create database %s", name), e);
 		}
 	}
 
-	protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean ignoreIfNotExists)
+	@Override
+	public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
 			throws DatabaseNotExistException, CatalogException {
 		try {
 			if (databaseExists(name)) {
-				client.alterDatabase(name, newHiveDatabase);
+				client.alterDatabase(name, createHiveDatabase(name, newDatabase));
 			} else if (!ignoreIfNotExists) {
 				throw new DatabaseNotExistException(catalogName, name);
 			}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
deleted file mode 100644
index 1a64a68..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive;
-
-import org.apache.flink.table.catalog.CatalogDatabase;
-
-import org.apache.hadoop.hive.metastore.api.Database;
-
-/**
- * Utils to convert meta objects between Flink and Hive for HiveCatalog.
- */
-public class HiveCatalogUtil {
-
-	private HiveCatalogUtil() {
-	}
-
-	// ------ Utils ------
-
-	/**
-	 * Creates a Hive database from CatalogDatabase.
-	 */
-	public static Database createHiveDatabase(String dbName, CatalogDatabase db) {
-		HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db;
-
-		return new Database(
-			dbName,
-			db.getComment(),
-			hiveCatalogDatabase.getLocation(),
-			hiveCatalogDatabase.getProperties());
-	}
-}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
deleted file mode 100644
index 7564e40..0000000
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive.util;
-
-import org.apache.flink.table.catalog.CatalogDatabase;
-
-import org.apache.hadoop.hive.metastore.api.Database;
-
-/**
- * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog.
- */
-public class GenericHiveMetastoreCatalogUtil {
-
-	private GenericHiveMetastoreCatalogUtil() {
-	}
-
-	// ------ Utils ------
-
-	/**
-	 * Creates a Hive database from a CatalogDatabase.
-	 *
-	 * @param databaseName name of the database
-	 * @param catalogDatabase the CatalogDatabase instance
-	 * @return a Hive database
-	 */
-	public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) {
-		return new Database(
-			databaseName,
-			catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null,
-			null,
-			catalogDatabase.getProperties());
-	}
-}