You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/13 01:36:49 UTC

[flink] branch release-1.10 updated: [FLINK-16343][table-planner-blink] Improve exception message when reading an unbounded source in batch mode

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 6a147b5  [FLINK-16343][table-planner-blink] Improve exception message when reading an unbounded source in batch mode
6a147b5 is described below

commit 6a147b594b66e89e412b63fb3e122284616da61d
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Mar 12 20:58:13 2020 +0800

    [FLINK-16343][table-planner-blink] Improve exception message when reading an unbounded source in batch mode
    
    This closes #11387
---
 .../planner/plan/schema/CatalogSourceTable.scala   | 19 +++--
 .../table/planner/catalog/CatalogTableITCase.scala | 82 +---------------------
 .../planner/plan/batch/sql/TableScanTest.scala     | 23 +++++-
 .../batch/sql/PartitionableSinkITCase.scala        | 30 ++------
 4 files changed, 41 insertions(+), 113 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..de1558e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.table.planner.plan.schema
 
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.catalog.CatalogTable
 import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
-
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
@@ -155,10 +154,20 @@ class CatalogSourceTable[T](
     } else {
       TableFactoryUtil.findAndCreateTableSource(catalogTable)
     }
-    if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
-      throw new TableException("Catalog tables support only "
-        + "StreamTableSource and InputFormatTableSource")
+
+    // validation
+    val tableName = schemaTable.getTableIdentifier.asSummaryString();
+    tableSource match {
+      case ts: StreamTableSource[_] =>
+        if (!schemaTable.isStreamingMode && !ts.isBounded) {
+          throw new ValidationException("Cannot query on an unbounded source in batch mode, " +
+            s"but '$tableName' is unbounded.")
+        }
+      case _ =>
+        throw new ValidationException("Catalog tables only support "
+          + "StreamTableSource and InputFormatTableSource")
     }
+
     tableSource
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 9e2d628..5d64efe 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -34,7 +34,7 @@ import org.junit.Assert.{assertEquals, fail}
 import org.junit.rules.ExpectedException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Rule, Test}
+import org.junit.{Before, Rule, Test}
 
 import java.io.File
 import java.util
@@ -646,86 +646,6 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
     assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
   }
 
-  @Test @Ignore("FLINK-14320") // need to implement
-  def testStreamSourceTableWithRowtime(): Unit = {
-    val sourceData = List(
-      toRow(1, 1000),
-      toRow(2, 2000),
-      toRow(3, 3000)
-    )
-    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
-    val sourceDDL =
-      """
-        |create table t1(
-        |  a timestamp(3),
-        |  b bigint,
-        |  WATERMARK FOR a AS a - interval '1' SECOND
-        |) with (
-        |  'connector' = 'COLLECTION'
-        |)
-      """.stripMargin
-    val sinkDDL =
-      """
-        |create table t2(
-        |  a timestamp(3),
-        |  b bigint
-        |) with (
-        |  'connector' = 'COLLECTION'
-        |)
-      """.stripMargin
-    val query =
-      """
-        |insert into t2
-        |select a, sum(b) from t1 group by TUMBLE(a, INTERVAL '1' SECOND)
-      """.stripMargin
-
-    tableEnv.sqlUpdate(sourceDDL)
-    tableEnv.sqlUpdate(sinkDDL)
-    tableEnv.sqlUpdate(query)
-    execJob("testJob")
-    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
-  }
-
-  @Test @Ignore("FLINK-14320") // need to implement
-  def testBatchTableWithRowtime(): Unit = {
-    val sourceData = List(
-      toRow(1, 1000),
-      toRow(2, 2000),
-      toRow(3, 3000)
-    )
-    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
-    val sourceDDL =
-      """
-        |create table t1(
-        |  a timestamp(3),
-        |  b bigint,
-        |  WATERMARK FOR a AS a - interval '1' SECOND
-        |) with (
-        |  'connector' = 'COLLECTION'
-        |)
-      """.stripMargin
-    val sinkDDL =
-      """
-        |create table t2(
-        |  a timestamp(3),
-        |  b bigint
-        |) with (
-        |  'connector' = 'COLLECTION'
-        |)
-      """.stripMargin
-    val query =
-      """
-        |insert into t2
-        |select a, sum(b) from t1 group by TUMBLE(a, INTERVAL '1' SECOND)
-      """.stripMargin
-
-    tableEnv.sqlUpdate(sourceDDL)
-    tableEnv.sqlUpdate(sinkDDL)
-    tableEnv.sqlUpdate(query)
-    execJob("testJob")
-    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
-  }
-
   @Test
   def testDropTableWithFullPath(): Unit = {
     val ddl1 =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
index 6a8039c..48c5d85 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.descriptors.{FileSystem, OldCsv, Schema}
 import org.apache.flink.table.planner.expressions.utils.Func0
 import org.apache.flink.table.planner.utils.TableTestBase
-
 import org.junit.{Before, Test}
 
 class TableScanTest extends TableTestBase {
@@ -89,6 +88,26 @@ class TableScanTest extends TableTestBase {
   }
 
   @Test
+  def testScanOnUnboundedSource(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE src (
+        |  ts TIMESTAMP(3),
+        |  a INT,
+        |  b DOUBLE,
+        |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin)
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Cannot query on an unbounded source in batch mode, " +
+      "but 'default_catalog.default_database.src' is unbounded")
+    util.verifyPlan("SELECT * FROM src WHERE a > 1")
+  }
+
+  @Test
   def testDDLWithComputedColumn(): Unit = {
     util.verifyPlan("SELECT * FROM computed_column_t")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 12579dc..2b714bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -23,21 +23,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{SqlDialect, TableEnvironment, TableException, TableSchema, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, ValidationException}
 import org.apache.flink.table.catalog.{CatalogTableImpl, ObjectPath}
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
 import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.descriptors.Schema.SCHEMA
-import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
 
@@ -168,7 +166,7 @@ class PartitionableSinkITCase extends BatchTestBase {
     expectedEx.expect(classOf[ValidationException])
     registerTableSink(tableName = "sinkTable2", rowType = type4,
       partitionColumns = Array("a", "b"))
-    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sortTable")
     tEnv.execute("testJob")
   }
 
@@ -176,7 +174,7 @@ class PartitionableSinkITCase extends BatchTestBase {
   def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
     expectedEx.expect(classOf[TableException])
     registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
-    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sortTable")
     tEnv.execute("testJob")
   }
 
@@ -321,7 +319,7 @@ private class TestSink(
   }
 }
 
-class TestPartitionableSinkFactory extends TableSinkFactory[Row] with TableSourceFactory[Row] {
+class TestPartitionableSinkFactory extends TableSinkFactory[Row] {
 
   override def requiredContext(): util.Map[String, String] = {
     val context = new util.HashMap[String, String]()
@@ -349,23 +347,5 @@ class TestPartitionableSinkFactory extends TableSinkFactory[Row] with TableSourc
       supportsGrouping,
       partitionColumns.asScala.toArray[String])
   }
-
-  /**
-    * Remove it after FLINK-14387.
-    */
-  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
-    val dp = new DescriptorProperties()
-    dp.putProperties(properties)
-
-    new StreamTableSource[Row] {
-      override def getTableSchema: TableSchema = {
-        dp.getTableSchema(SCHEMA)
-      }
-
-      override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-        throw new RuntimeException
-      }
-    }
-  }
 }