You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/14 17:28:52 UTC

[flink] branch master updated: [FLINK-12469][table] Clean up catalog API on default/current database

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c3ac2f  [FLINK-12469][table] Clean up catalog API on default/current database
1c3ac2f is described below

commit 1c3ac2f82b707ae12556a5883980da8050510244
Author: Xuefu Zhang <xu...@alibaba-inc.com>
AuthorDate: Thu May 9 10:37:12 2019 -0700

    [FLINK-12469][table] Clean up catalog API on default/current database
    
    This PR separates concepts of "current database of a user session" and "default database of a catalog".
    
    This closes #8390.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  |  4 +++
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 34 ++++++++++------------
 .../hive/GenericHiveMetastoreCatalogTest.java      |  7 -----
 .../flink/table/catalog/hive/HiveCatalogTest.java  |  5 ----
 .../table/catalog/GenericInMemoryCatalog.java      | 29 +++++++-----------
 .../table/catalog/GenericInMemoryCatalogTest.java  |  5 ----
 .../org/apache/flink/table/catalog/Catalog.java    | 17 +++--------
 .../flink/table/catalog/CatalogTestBase.java       | 34 ++--------------------
 8 files changed, 37 insertions(+), 98 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index c6d14f4..fa0b86a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -79,6 +79,10 @@ public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
 
 	public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) {
 		super(catalogName, hiveConf);
+	}
+
+	public GenericHiveMetastoreCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) {
+		super(catalogName, defaultDatabase, hiveConf);
 
 		LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName);
 	}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index 5c44ebc..f4eecf1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -55,23 +55,27 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class HiveCatalogBase implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class);
-
-	public static final String DEFAULT_DB = "default";
+	private static final String DEFAULT_DB = "default";
 
 	protected final String catalogName;
 	protected final HiveConf hiveConf;
 
-	protected String currentDatabase = DEFAULT_DB;
+	private final String defaultDatabase;
 	protected IMetaStoreClient client;
 
 	public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
-		this(catalogName, getHiveConf(hivemetastoreURI));
+		this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
 	}
 
 	public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+		this(catalogName, DEFAULT_DB, hiveConf);
+	}
+
+	public HiveCatalogBase(String catalogName, String defaultDatabase, HiveConf hiveConf) {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
 		this.catalogName = catalogName;
-
+		this.defaultDatabase = defaultDatabase;
 		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 	}
 
@@ -130,6 +134,11 @@ public abstract class HiveCatalogBase implements Catalog {
 			client = getMetastoreClient(hiveConf);
 			LOG.info("Connected to Hive metastore");
 		}
+
+		if (!databaseExists(defaultDatabase)) {
+			throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
+				defaultDatabase, catalogName));
+		}
 	}
 
 	@Override
@@ -144,19 +153,8 @@ public abstract class HiveCatalogBase implements Catalog {
 	// ------ databases ------
 
 	@Override
-	public String getCurrentDatabase() throws CatalogException {
-		return currentDatabase;
-	}
-
-	@Override
-	public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
-		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		}
-
-		currentDatabase = databaseName;
+	public String getDefaultDatabase() throws CatalogException {
+		return defaultDatabase;
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index ca7df2e..24e3439 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -90,13 +90,6 @@ public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
 		checkEquals(table, (CatalogTable) catalog.getTable(path1));
 	}
 
-	// ------ utils ------
-
-	@Override
-	public String getBuiltInDefaultDatabase() {
-		return HiveCatalogBase.DEFAULT_DB;
-	}
-
 	@Override
 	public CatalogDatabase createDb() {
 		return new GenericCatalogDatabase(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 2b7e397..2f3cd7a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -92,11 +92,6 @@ public class HiveCatalogTest extends CatalogTestBase {
 	// ------ utils ------
 
 	@Override
-	public String getBuiltInDefaultDatabase() {
-		return HiveCatalogBase.DEFAULT_DB;
-	}
-
-	@Override
 	public CatalogDatabase createDb() {
 		return new HiveCatalogDatabase(
 			new HashMap<String, String>() {{
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 96f9e43..72797d9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -48,10 +48,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A generic catalog implementation that holds all meta objects in memory.
  */
 public class GenericInMemoryCatalog implements Catalog {
+	private static final String DEFAULT_DB = "default";
 
-	public static final String DEFAULT_DB = "default";
-
-	private String currentDatabase = DEFAULT_DB;
+	private final String defaultDatabase;
 
 	private final String catalogName;
 	private final Map<String, CatalogDatabase> databases;
@@ -65,11 +64,17 @@ public class GenericInMemoryCatalog implements Catalog {
 	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
 
 	public GenericInMemoryCatalog(String name) {
+		this(name, DEFAULT_DB);
+	}
+
+	public GenericInMemoryCatalog(String name, String defaultDatabase) {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty");
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty");
 
 		this.catalogName = name;
+		this.defaultDatabase = defaultDatabase;
 		this.databases = new LinkedHashMap<>();
-		this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new HashMap<>()));
+		this.databases.put(defaultDatabase, new GenericCatalogDatabase(new HashMap<>()));
 		this.tables = new LinkedHashMap<>();
 		this.functions = new LinkedHashMap<>();
 		this.partitions = new LinkedHashMap<>();
@@ -81,7 +86,6 @@ public class GenericInMemoryCatalog implements Catalog {
 
 	@Override
 	public void open() {
-
 	}
 
 	@Override
@@ -92,19 +96,8 @@ public class GenericInMemoryCatalog implements Catalog {
 	// ------ databases ------
 
 	@Override
-	public String getCurrentDatabase() {
-		return currentDatabase;
-	}
-
-	@Override
-	public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException {
-		checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-
-		if (!databaseExists(databaseName)) {
-			throw new DatabaseNotExistException(catalogName, databaseName);
-		}
-
-		currentDatabase = databaseName;
+	public String getDefaultDatabase() {
+		return defaultDatabase;
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 89cc0ee..6992da5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -628,11 +628,6 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 	// ------ utilities ------
 
 	@Override
-	public String getBuiltInDefaultDatabase() {
-		return GenericInMemoryCatalog.DEFAULT_DB;
-	}
-
-	@Override
 	public CatalogDatabase createDb() {
 		return new GenericCatalogDatabase(
 			new HashMap<String, String>() {{
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 917cf7a..cd5be1a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -60,23 +60,14 @@ public interface Catalog {
 	// ------ databases ------
 
 	/**
-	 * Get the name of the current database of this type of catalog. This is used when users refers an object in the catalog
-	 * without specifying a database. For example, the current db in a Hive Metastore is 'default' by default.
+	 * Get the name of the default database for this catalog. The default database will be the current database for
+	 * the catalog when user's session doesn't specify a current database. The value probably comes from configuration,
+	 * will not change for the life time of the catalog instance.
 	 *
 	 * @return the name of the current database
 	 * @throws CatalogException in case of any runtime exception
 	 */
-	String getCurrentDatabase() throws CatalogException;
-
-	/**
-	 * Set the database with the given name as the current database. A current database is used when users refers an object
-	 * in the catalog without specifying a database.
-	 *
-	 * @param databaseName	the name of the database
-	 * @throws DatabaseNotExistException if the given database doesn't exist in the catalog
-	 * @throws CatalogException in case of any runtime exception
-	 */
-	void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException;
+	String getDefaultDatabase() throws CatalogException;
 
 	/**
 	 * Get the names of all databases in this catalog.
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index ec37104..e66525d 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -111,29 +111,6 @@ public abstract class CatalogTestBase {
 	}
 
 	@Test
-	public void testSetCurrentDatabase() throws Exception {
-		assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
-
-		catalog.createDatabase(db2, createDb(), true);
-		catalog.setCurrentDatabase(db2);
-
-		assertEquals(db2, catalog.getCurrentDatabase());
-
-		catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
-
-		assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase());
-
-		catalog.dropDatabase(db2, false);
-	}
-
-	@Test
-	public void testSetCurrentDatabaseNegative() throws Exception {
-		exception.expect(DatabaseNotExistException.class);
-		exception.expectMessage("Database " + this.nonExistentDatabase + " does not exist in Catalog");
-		catalog.setCurrentDatabase(this.nonExistentDatabase);
-	}
-
-	@Test
 	public void testCreateDb_DatabaseAlreadyExistException() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 
@@ -150,13 +127,13 @@ public abstract class CatalogTestBase {
 
 		assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
 		assertEquals(2, dbs.size());
-		assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs));
+		assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getDefaultDatabase())), new HashSet<>(dbs));
 
 		catalog.createDatabase(db1, createAnotherDb(), true);
 
 		assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
 		assertEquals(2, dbs.size());
-		assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs));
+		assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getDefaultDatabase())), new HashSet<>(dbs));
 	}
 
 	@Test
@@ -586,13 +563,6 @@ public abstract class CatalogTestBase {
 	// ------ utilities ------
 
 	/**
-	 * Get the built-in default database of the specific catalog implementation.
-	 *
-	 * @return The built-in default database name
-	 */
-	public abstract String getBuiltInDefaultDatabase();
-
-	/**
 	 * Create a CatalogDatabase instance by specific catalog implementation.
 	 *
 	 * @return a CatalogDatabase instance