You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 01:52:49 UTC

[flink] branch release-1.11 updated: [FLINK-17756][table] Drop table/view shouldn't take effect on each other

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 23d4675  [FLINK-17756][table] Drop table/view shouldn't take effect on each other
23d4675 is described below

commit 23d4675520e3edbd3171374ed7a81444965b3d96
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed May 27 09:51:23 2020 +0800

    [FLINK-17756][table] Drop table/view shouldn't take effect on each other
    
    
    This closes #12314
---
 .../table/api/internal/TableEnvironmentImpl.java   |  2 +-
 .../apache/flink/table/catalog/CatalogManager.java | 82 +++++++++++++++------
 .../table/planner/catalog/CatalogTableITCase.scala | 69 ++++++++++++++++++
 .../flink/table/api/internal/TableEnvImpl.scala    |  2 +-
 .../flink/table/catalog/CatalogTableITCase.scala   | 85 ++++++++++++++++++++--
 5 files changed, 207 insertions(+), 33 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c10552b..6fc2e86 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -889,7 +889,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 						dropViewOperation.getViewIdentifier(),
 						dropViewOperation.isIfExists());
 			} else {
-				catalogManager.dropTable(
+				catalogManager.dropView(
 						dropViewOperation.getViewIdentifier(),
 						dropViewOperation.isIfExists());
 			}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 4b6632e..44dc6c3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -336,16 +336,12 @@ public final class CatalogManager {
 	 * @return table that the path points to.
 	 */
 	public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
-		try {
-			CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
-			if (temporaryTable != null) {
-				return Optional.of(TableLookupResult.temporary(temporaryTable));
-			} else {
-				return getPermanentTable(objectIdentifier);
-			}
-		} catch (TableNotExistException ignored) {
+		CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
+		if (temporaryTable != null) {
+			return Optional.of(TableLookupResult.temporary(temporaryTable));
+		} else {
+			return getPermanentTable(objectIdentifier);
 		}
-		return Optional.empty();
 	}
 
 	/**
@@ -367,13 +363,15 @@ public final class CatalogManager {
 		return Optional.empty();
 	}
 
-	private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier)
-			throws TableNotExistException {
+	private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier) {
 		Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName());
 		ObjectPath objectPath = objectIdentifier.toObjectPath();
-
-		if (currentCatalog != null && currentCatalog.tableExists(objectPath)) {
-			return Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+		if (currentCatalog != null) {
+			try {
+				return Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+			} catch (TableNotExistException e) {
+				// Ignore.
+			}
 		}
 		return Optional.empty();
 	}
@@ -682,20 +680,58 @@ public final class CatalogManager {
 	 * Drops a table in a given fully qualified path.
 	 *
 	 * @param objectIdentifier The fully qualified path of the table to drop.
-	 * @param ignoreIfNotExists If false exception will be thrown if the table or database or catalog to be altered
+	 * @param ignoreIfNotExists If false exception will be thrown if the table to drop
 	 *                          does not exist.
 	 */
 	public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
-		if (temporaryTables.containsKey(objectIdentifier)) {
+		dropTableInternal(
+				objectIdentifier,
+				ignoreIfNotExists,
+				true);
+	}
+
+	/**
+	 * Drops a view in a given fully qualified path.
+	 *
+	 * @param objectIdentifier The fully qualified path of the view to drop.
+	 * @param ignoreIfNotExists If false exception will be thrown if the view to drop
+	 *                          does not exist.
+	 */
+	public void dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
+		dropTableInternal(
+				objectIdentifier,
+				ignoreIfNotExists,
+				false);
+	}
+
+	private void dropTableInternal(
+			ObjectIdentifier objectIdentifier,
+			boolean ignoreIfNotExists,
+			boolean isDropTable) {
+		Predicate<CatalogBaseTable> filter = isDropTable
+				? table -> table instanceof CatalogTable
+				: table -> table instanceof CatalogView;
+		// Same name temporary table or view exists.
+		if (filter.test(temporaryTables.get(objectIdentifier))) {
+			String tableOrView = isDropTable ? "table" : "view";
 			throw new ValidationException(String.format(
-				"Temporary table with identifier '%s' exists. Drop it first before removing the permanent table.",
-				objectIdentifier));
+					"Temporary %s with identifier '%s' exists. "
+							+ "Drop it first before removing the permanent %s.",
+					tableOrView, objectIdentifier, tableOrView));
+		}
+		final Optional<TableLookupResult> resultOpt = getPermanentTable(objectIdentifier);
+		if (resultOpt.isPresent() && filter.test(resultOpt.get().getTable())) {
+			execute(
+					(catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
+					objectIdentifier,
+					ignoreIfNotExists,
+					"DropTable");
+		} else if (!ignoreIfNotExists) {
+			String tableOrView = isDropTable ? "Table" : "View";
+			throw new ValidationException(String.format(
+					"%s with identifier '%s' does not exist.",
+					tableOrView, objectIdentifier.asSummaryString()));
 		}
-		execute(
-			(catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
-			objectIdentifier,
-			ignoreIfNotExists,
-			"DropTable");
 	}
 
 	/**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 45db348..ae721a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -880,6 +880,75 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   }
 
   @Test
+  def testDropTableSameNameWithTemporaryTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val createTable2 =
+      """
+        |create temporary table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql(createTable2)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Temporary table with identifier "
+      + "'`default_catalog`.`default_database`.`t1`' exists. "
+      + "Drop it first before removing the permanent table.")
+    tableEnv.executeSql("drop table t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("View with identifier "
+      + "'default_catalog.default_database.t1' does not exist.")
+    tableEnv.executeSql("drop view t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTableIfNotExists(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql("drop view if exists t1")
+    assert(tableEnv.listTables().sameElements(Array("t1")))
+  }
+
+  @Test
   def testAlterTable(): Unit = {
     val ddl1 =
       """
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index b57f906..5d97ede 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -765,7 +765,7 @@ abstract class TableEnvImpl(
             dropViewOperation.getViewIdentifier,
             dropViewOperation.isIfExists)
         } else {
-          catalogManager.dropTable(
+          catalogManager.dropView(
             dropViewOperation.getViewIdentifier,
             dropViewOperation.isIfExists)
         }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index 23a6cd9..ce50b33 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -23,15 +23,16 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
 import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 
 import org.junit.Assert.{assertEquals, fail}
+import org.junit.rules.ExpectedException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Ignore, Rule, Test}
 
 import java.util
-import org.apache.flink.test.util.AbstractTestBase
 
 import scala.collection.JavaConversions._
 
@@ -52,18 +53,17 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase {
       toRow(3, "c")
   )
 
-  private val DIM_DATA = List(
-    toRow(1, "aDim"),
-    toRow(2, "bDim"),
-    toRow(3, "cDim")
-  )
-
   implicit def rowOrdering: Ordering[Row] = Ordering.by((r : Row) => {
     val builder = new StringBuilder
     0 until r.getArity foreach(idx => builder.append(r.getField(idx)))
     builder.toString()
   })
 
+  var _expectedEx: ExpectedException = ExpectedException.none
+
+  @Rule
+  def expectedEx: ExpectedException = _expectedEx
+
   @Before
   def before(): Unit = {
     batchExec.setParallelism(4)
@@ -538,6 +538,75 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase {
   }
 
   @Test
+  def testDropTableSameNameWithTemporaryTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val createTable2 =
+      """
+        |create temporary table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql(createTable2)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Temporary table with identifier "
+      + "'`default_catalog`.`default_database`.`t1`' exists. "
+      + "Drop it first before removing the permanent table.")
+    tableEnv.executeSql("drop table t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTable(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("View with identifier "
+      + "'default_catalog.default_database.t1' does not exist.")
+    tableEnv.executeSql("drop view t1")
+  }
+
+  @Test
+  def testDropViewSameNameWithTableIfNotExists(): Unit = {
+    val createTable1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(createTable1)
+    tableEnv.executeSql("drop view if exists t1")
+    assert(tableEnv.listTables().sameElements(Array("t1")))
+  }
+
+  @Test
   def testAlterTable(): Unit = {
     val ddl1 =
       """