You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/17 04:48:30 UTC

[flink] branch master updated: [FLINK-12452][table][hive] alterTable() should ensure existing base table and the new one are of the same type

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba30707  [FLINK-12452][table][hive] alterTable() should ensure existing base table and the new one are of the same type
ba30707 is described below

commit ba307072011ed2c44f5f4937e12be5da5e6bf805
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed May 15 14:38:25 2019 -0700

    [FLINK-12452][table][hive] alterTable() should ensure existing base table and the new one are of the same type
    
    Currently all catalogs doesn't check if existing base table and the new one are of the same type in alterTable(), e.g. existing table is a view but user tries to replace it with a table. This PR adds such checks to all catalogs
    
    This closes #8458.
---
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 25 ++++++++++++++--
 .../table/catalog/GenericInMemoryCatalog.java      | 12 +++++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 23 +++++++++++++++
 .../flink/table/catalog/CatalogTestBase.java       | 33 ++++++++++++++++++++++
 4 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index 031a61a..4180308 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -320,13 +322,32 @@ public abstract class HiveCatalogBase implements Catalog {
 				return;
 			}
 
-			// TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type
+			Table oldTable = getHiveTable(tablePath);
+			TableType oldTableType = TableType.valueOf(oldTable.getTableType());
+
+			if (oldTableType == TableType.VIRTUAL_VIEW) {
+				if (!(newCatalogTable instanceof CatalogView)) {
+					throw new CatalogException(
+						String.format("Table types don't match. The existing table is a view, but the new catalog base table is not."));
+				}
+				// Else, do nothing
+			} else if ((oldTableType == TableType.MANAGED_TABLE)) {
+				if (!(newCatalogTable instanceof CatalogTable)) {
+					throw new CatalogException(
+						String.format("Table types don't match. The existing table is a table, but the new catalog base table is not."));
+				}
+				// Else, do nothing
+			} else {
+				throw new CatalogException(
+					String.format("Hive table type '%s' is not supported yet.",
+						oldTableType.name()));
+			}
+
 			Table newTable = createHiveTable(tablePath, newCatalogTable);
 
 			// client.alter_table() requires a valid location
 			// thus, if new table doesn't have that, it reuses location of the old table
 			if (!newTable.getSd().isSetLocation()) {
-				Table oldTable = getHiveTable(tablePath);
 				newTable.getSd().setLocation(oldTable.getSd().getLocation());
 			}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 72797d9..6a1f5df 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -209,11 +209,15 @@ public class GenericInMemoryCatalog implements Catalog {
 		checkNotNull(tablePath);
 		checkNotNull(newTable);
 
-		// TODO: validate the new and old CatalogBaseTable must be of the same type. For example, this doesn't
-		//		allow alter a regular table to partitioned table, or alter a view to a table, and vice versa.
-		//		And also add unit tests.
-
 		if (tableExists(tablePath)) {
+			CatalogBaseTable oldTable = tables.get(tablePath);
+
+			if (oldTable.getClass() != newTable.getClass()) {
+				throw new CatalogException(
+					String.format("Table classes don't match. Existing table is '%s' and new table is '%s'. They should be of the same class.",
+						oldTable.getClass().getName(), newTable.getClass().getName()));
+			}
+
 			tables.put(tablePath, newTable.copy());
 		} else if (!ignoreIfNotExists) {
 			throw new TableNotExistException(catalogName, tablePath);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 6992da5..a013aed 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -108,6 +109,28 @@ public class GenericInMemoryCatalogTest extends CatalogTestBase {
 		assertFalse(catalog.partitionExists(path1, catalogPartitionSpec));
 	}
 
+	@Test
+	public void testAlterTable_alterTableWithView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		exception.expect(CatalogException.class);
+		exception.expectMessage("Existing table is 'org.apache.flink.table.catalog.GenericCatalogTable' " +
+			"and new table is 'org.apache.flink.table.catalog.GenericCatalogView'. They should be of the same class.");
+		catalog.alterTable(path1, createView(), false);
+	}
+
+	@Test
+	public void testAlterTable_alterViewWithTable() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		exception.expect(CatalogException.class);
+		exception.expectMessage("Existing table is 'org.apache.flink.table.catalog.GenericCatalogView' " +
+			"and new table is 'org.apache.flink.table.catalog.GenericCatalogTable'. They should be of the same class.");
+		catalog.alterTable(path1, createTable(), false);
+	}
+
 	// ------ partitions ------
 
 	@Test
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 780251c..0d9d5df 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -360,6 +361,18 @@ public abstract class CatalogTestBase {
 		catalog.alterTable(path1, newTable, false);
 
 		checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
+
+		// View
+		CatalogView view = createView();
+		catalog.createTable(path3, view, false);
+
+		checkEquals(view, (CatalogView) catalog.getTable(path3));
+
+		CatalogView newView = createAnotherView();
+		catalog.alterTable(path3, newView, false);
+
+		assertNotEquals(view, catalog.getTable(path3));
+		checkEquals(newView, (CatalogView) catalog.getTable(path3));
 	}
 
 	@Test
@@ -378,6 +391,26 @@ public abstract class CatalogTestBase {
 	}
 
 	@Test
+	public void testAlterTable_alterTableWithView() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		exception.expect(CatalogException.class);
+		exception.expectMessage("The existing table is a table, but the new catalog base table is not.");
+		catalog.alterTable(path1, createView(), false);
+	}
+
+	@Test
+	public void testAlterTable_alterViewWithTable() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createView(), false);
+
+		exception.expect(CatalogException.class);
+		exception.expectMessage("The existing table is a view, but the new catalog base table is not.");
+		catalog.alterTable(path1, createTable(), false);
+	}
+
+	@Test
 	public void testRenameTable_nonPartitionedTable() throws Exception {
 		catalog.createDatabase(db1, createDb(), false);
 		CatalogTable table = createTable();