You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 01:52:49 UTC
[flink] branch release-1.11 updated: [FLINK-17756][table] Drop
table/view shouldn't take effect on each other
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 23d4675 [FLINK-17756][table] Drop table/view shouldn't take effect on each other
23d4675 is described below
commit 23d4675520e3edbd3171374ed7a81444965b3d96
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed May 27 09:51:23 2020 +0800
[FLINK-17756][table] Drop table/view shouldn't take effect on each other
This closes #12314
---
.../table/api/internal/TableEnvironmentImpl.java | 2 +-
.../apache/flink/table/catalog/CatalogManager.java | 82 +++++++++++++++------
.../table/planner/catalog/CatalogTableITCase.scala | 69 ++++++++++++++++++
.../flink/table/api/internal/TableEnvImpl.scala | 2 +-
.../flink/table/catalog/CatalogTableITCase.scala | 85 ++++++++++++++++++++--
5 files changed, 207 insertions(+), 33 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c10552b..6fc2e86 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -889,7 +889,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
dropViewOperation.getViewIdentifier(),
dropViewOperation.isIfExists());
} else {
- catalogManager.dropTable(
+ catalogManager.dropView(
dropViewOperation.getViewIdentifier(),
dropViewOperation.isIfExists());
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 4b6632e..44dc6c3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -336,16 +336,12 @@ public final class CatalogManager {
* @return table that the path points to.
*/
public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
- try {
- CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
- if (temporaryTable != null) {
- return Optional.of(TableLookupResult.temporary(temporaryTable));
- } else {
- return getPermanentTable(objectIdentifier);
- }
- } catch (TableNotExistException ignored) {
+ CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
+ if (temporaryTable != null) {
+ return Optional.of(TableLookupResult.temporary(temporaryTable));
+ } else {
+ return getPermanentTable(objectIdentifier);
}
- return Optional.empty();
}
/**
@@ -367,13 +363,15 @@ public final class CatalogManager {
return Optional.empty();
}
- private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier)
- throws TableNotExistException {
+ private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier) {
Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName());
ObjectPath objectPath = objectIdentifier.toObjectPath();
-
- if (currentCatalog != null && currentCatalog.tableExists(objectPath)) {
- return Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+ if (currentCatalog != null) {
+ try {
+ return Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+ } catch (TableNotExistException e) {
+ // Ignore.
+ }
}
return Optional.empty();
}
@@ -682,20 +680,58 @@ public final class CatalogManager {
* Drops a table in a given fully qualified path.
*
* @param objectIdentifier The fully qualified path of the table to drop.
- * @param ignoreIfNotExists If false exception will be thrown if the table or database or catalog to be altered
+ * @param ignoreIfNotExists If false exception will be thrown if the table to drop
* does not exist.
*/
public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
- if (temporaryTables.containsKey(objectIdentifier)) {
+ dropTableInternal(
+ objectIdentifier,
+ ignoreIfNotExists,
+ true);
+ }
+
+ /**
+ * Drops a view in a given fully qualified path.
+ *
+ * @param objectIdentifier The fully qualified path of the view to drop.
+ * @param ignoreIfNotExists If false exception will be thrown if the view to drop
+ * does not exist.
+ */
+ public void dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
+ dropTableInternal(
+ objectIdentifier,
+ ignoreIfNotExists,
+ false);
+ }
+
+ private void dropTableInternal(
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists,
+ boolean isDropTable) {
+ Predicate<CatalogBaseTable> filter = isDropTable
+ ? table -> table instanceof CatalogTable
+ : table -> table instanceof CatalogView;
+ // Same name temporary table or view exists.
+ if (filter.test(temporaryTables.get(objectIdentifier))) {
+ String tableOrView = isDropTable ? "table" : "view";
throw new ValidationException(String.format(
- "Temporary table with identifier '%s' exists. Drop it first before removing the permanent table.",
- objectIdentifier));
+ "Temporary %s with identifier '%s' exists. "
+ + "Drop it first before removing the permanent %s.",
+ tableOrView, objectIdentifier, tableOrView));
+ }
+ final Optional<TableLookupResult> resultOpt = getPermanentTable(objectIdentifier);
+ if (resultOpt.isPresent() && filter.test(resultOpt.get().getTable())) {
+ execute(
+ (catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
+ objectIdentifier,
+ ignoreIfNotExists,
+ "DropTable");
+ } else if (!ignoreIfNotExists) {
+ String tableOrView = isDropTable ? "Table" : "View";
+ throw new ValidationException(String.format(
+ "%s with identifier '%s' does not exist.",
+ tableOrView, objectIdentifier.asSummaryString()));
}
- execute(
- (catalog, path) -> catalog.dropTable(path, ignoreIfNotExists),
- objectIdentifier,
- ignoreIfNotExists,
- "DropTable");
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 45db348..ae721a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -880,6 +880,75 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
}
@Test
+ def testDropTableSameNameWithTemporaryTable(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ val createTable2 =
+ """
+ |create temporary table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+ tableEnv.executeSql(createTable2)
+
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Temporary table with identifier "
+ + "'`default_catalog`.`default_database`.`t1`' exists. "
+ + "Drop it first before removing the permanent table.")
+ tableEnv.executeSql("drop table t1")
+ }
+
+ @Test
+ def testDropViewSameNameWithTable(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("View with identifier "
+ + "'default_catalog.default_database.t1' does not exist.")
+ tableEnv.executeSql("drop view t1")
+ }
+
+ @Test
+ def testDropViewSameNameWithTableIfNotExists(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+ tableEnv.executeSql("drop view if exists t1")
+ assert(tableEnv.listTables().sameElements(Array("t1")))
+ }
+
+ @Test
def testAlterTable(): Unit = {
val ddl1 =
"""
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index b57f906..5d97ede 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -765,7 +765,7 @@ abstract class TableEnvImpl(
dropViewOperation.getViewIdentifier,
dropViewOperation.isIfExists)
} else {
- catalogManager.dropTable(
+ catalogManager.dropView(
dropViewOperation.getViewIdentifier,
dropViewOperation.isIfExists)
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index 23a6cd9..ce50b33 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -23,15 +23,16 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException}
import org.apache.flink.table.factories.utils.TestCollectionTableFactory
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert.{assertEquals, fail}
+import org.junit.rules.ExpectedException
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Ignore, Rule, Test}
import java.util
-import org.apache.flink.test.util.AbstractTestBase
import scala.collection.JavaConversions._
@@ -52,18 +53,17 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase {
toRow(3, "c")
)
- private val DIM_DATA = List(
- toRow(1, "aDim"),
- toRow(2, "bDim"),
- toRow(3, "cDim")
- )
-
implicit def rowOrdering: Ordering[Row] = Ordering.by((r : Row) => {
val builder = new StringBuilder
0 until r.getArity foreach(idx => builder.append(r.getField(idx)))
builder.toString()
})
+ var _expectedEx: ExpectedException = ExpectedException.none
+
+ @Rule
+ def expectedEx: ExpectedException = _expectedEx
+
@Before
def before(): Unit = {
batchExec.setParallelism(4)
@@ -538,6 +538,75 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase {
}
@Test
+ def testDropTableSameNameWithTemporaryTable(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ val createTable2 =
+ """
+ |create temporary table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+ tableEnv.executeSql(createTable2)
+
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Temporary table with identifier "
+ + "'`default_catalog`.`default_database`.`t1`' exists. "
+ + "Drop it first before removing the permanent table.")
+ tableEnv.executeSql("drop table t1")
+ }
+
+ @Test
+ def testDropViewSameNameWithTable(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("View with identifier "
+ + "'default_catalog.default_database.t1' does not exist.")
+ tableEnv.executeSql("drop view t1")
+ }
+
+ @Test
+ def testDropViewSameNameWithTableIfNotExists(): Unit = {
+ val createTable1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(createTable1)
+ tableEnv.executeSql("drop view if exists t1")
+ assert(tableEnv.listTables().sameElements(Array("t1")))
+ }
+
+ @Test
def testAlterTable(): Unit = {
val ddl1 =
"""