You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/13 01:36:49 UTC
[flink] branch release-1.10 updated:
[FLINK-16343][table-planner-blink] Improve exception message when reading
an unbounded source in batch mode
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 6a147b5 [FLINK-16343][table-planner-blink] Improve exception message when reading an unbounded source in batch mode
6a147b5 is described below
commit 6a147b594b66e89e412b63fb3e122284616da61d
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Mar 12 20:58:13 2020 +0800
[FLINK-16343][table-planner-blink] Improve exception message when reading an unbounded source in batch mode
This closes #11387
---
.../planner/plan/schema/CatalogSourceTable.scala | 19 +++--
.../table/planner/catalog/CatalogTableITCase.scala | 82 +---------------------
.../planner/plan/batch/sql/TableScanTest.scala | 23 +++++-
.../batch/sql/PartitionableSinkITCase.scala | 30 ++------
4 files changed, 41 insertions(+), 113 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..de1558e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,13 +18,12 @@
package org.apache.flink.table.planner.plan.schema
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
-
import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
@@ -155,10 +154,20 @@ class CatalogSourceTable[T](
} else {
TableFactoryUtil.findAndCreateTableSource(catalogTable)
}
- if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
- throw new TableException("Catalog tables support only "
- + "StreamTableSource and InputFormatTableSource")
+
+ // validation
+ val tableName = schemaTable.getTableIdentifier.asSummaryString();
+ tableSource match {
+ case ts: StreamTableSource[_] =>
+ if (!schemaTable.isStreamingMode && !ts.isBounded) {
+ throw new ValidationException("Cannot query on an unbounded source in batch mode, " +
+ s"but '$tableName' is unbounded.")
+ }
+ case _ =>
+ throw new ValidationException("Catalog tables only support "
+ + "StreamTableSource and InputFormatTableSource")
}
+
tableSource
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 9e2d628..5d64efe 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -34,7 +34,7 @@ import org.junit.Assert.{assertEquals, fail}
import org.junit.rules.ExpectedException
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Rule, Test}
+import org.junit.{Before, Rule, Test}
import java.io.File
import java.util
@@ -646,86 +646,6 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
}
- @Test @Ignore("FLINK-14320") // need to implement
- def testStreamSourceTableWithRowtime(): Unit = {
- val sourceData = List(
- toRow(1, 1000),
- toRow(2, 2000),
- toRow(3, 3000)
- )
- TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
- val sourceDDL =
- """
- |create table t1(
- | a timestamp(3),
- | b bigint,
- | WATERMARK FOR a AS a - interval '1' SECOND
- |) with (
- | 'connector' = 'COLLECTION'
- |)
- """.stripMargin
- val sinkDDL =
- """
- |create table t2(
- | a timestamp(3),
- | b bigint
- |) with (
- | 'connector' = 'COLLECTION'
- |)
- """.stripMargin
- val query =
- """
- |insert into t2
- |select a, sum(b) from t1 group by TUMBLE(a, INTERVAL '1' SECOND)
- """.stripMargin
-
- tableEnv.sqlUpdate(sourceDDL)
- tableEnv.sqlUpdate(sinkDDL)
- tableEnv.sqlUpdate(query)
- execJob("testJob")
- assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
- }
-
- @Test @Ignore("FLINK-14320") // need to implement
- def testBatchTableWithRowtime(): Unit = {
- val sourceData = List(
- toRow(1, 1000),
- toRow(2, 2000),
- toRow(3, 3000)
- )
- TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
- val sourceDDL =
- """
- |create table t1(
- | a timestamp(3),
- | b bigint,
- | WATERMARK FOR a AS a - interval '1' SECOND
- |) with (
- | 'connector' = 'COLLECTION'
- |)
- """.stripMargin
- val sinkDDL =
- """
- |create table t2(
- | a timestamp(3),
- | b bigint
- |) with (
- | 'connector' = 'COLLECTION'
- |)
- """.stripMargin
- val query =
- """
- |insert into t2
- |select a, sum(b) from t1 group by TUMBLE(a, INTERVAL '1' SECOND)
- """.stripMargin
-
- tableEnv.sqlUpdate(sourceDDL)
- tableEnv.sqlUpdate(sinkDDL)
- tableEnv.sqlUpdate(query)
- execJob("testJob")
- assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
- }
-
@Test
def testDropTableWithFullPath(): Unit = {
val ddl1 =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
index 6a8039c..48c5d85 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
@@ -19,12 +19,11 @@
package org.apache.flink.table.planner.plan.batch.sql
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.{DataTypes, ValidationException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{FileSystem, OldCsv, Schema}
import org.apache.flink.table.planner.expressions.utils.Func0
import org.apache.flink.table.planner.utils.TableTestBase
-
import org.junit.{Before, Test}
class TableScanTest extends TableTestBase {
@@ -89,6 +88,26 @@ class TableScanTest extends TableTestBase {
}
@Test
+ def testScanOnUnboundedSource(): Unit = {
+ util.addTable(
+ """
+ |CREATE TABLE src (
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE,
+ | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin)
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Cannot query on an unbounded source in batch mode, " +
+ "but 'default_catalog.default_database.src' is unbounded")
+ util.verifyPlan("SELECT * FROM src WHERE a > 1")
+ }
+
+ @Test
def testDDLWithComputedColumn(): Unit = {
util.verifyPlan("SELECT * FROM computed_column_t")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 12579dc..2b714bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -23,21 +23,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{SqlDialect, TableEnvironment, TableException, TableSchema, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, ValidationException}
import org.apache.flink.table.catalog.{CatalogTableImpl, ObjectPath}
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
import org.apache.flink.table.descriptors.DescriptorProperties
import org.apache.flink.table.descriptors.Schema.SCHEMA
-import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.factories.TableSinkFactory
import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.types.Row
@@ -168,7 +166,7 @@ class PartitionableSinkITCase extends BatchTestBase {
expectedEx.expect(classOf[ValidationException])
registerTableSink(tableName = "sinkTable2", rowType = type4,
partitionColumns = Array("a", "b"))
- tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sortTable")
tEnv.execute("testJob")
}
@@ -176,7 +174,7 @@ class PartitionableSinkITCase extends BatchTestBase {
def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
expectedEx.expect(classOf[TableException])
registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
- tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sortTable")
tEnv.execute("testJob")
}
@@ -321,7 +319,7 @@ private class TestSink(
}
}
-class TestPartitionableSinkFactory extends TableSinkFactory[Row] with TableSourceFactory[Row] {
+class TestPartitionableSinkFactory extends TableSinkFactory[Row] {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
@@ -349,23 +347,5 @@ class TestPartitionableSinkFactory extends TableSinkFactory[Row] with TableSourc
supportsGrouping,
partitionColumns.asScala.toArray[String])
}
-
- /**
- * Remove it after FLINK-14387.
- */
- override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
- val dp = new DescriptorProperties()
- dp.putProperties(properties)
-
- new StreamTableSource[Row] {
- override def getTableSchema: TableSchema = {
- dp.getTableSchema(SCHEMA)
- }
-
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- throw new RuntimeException
- }
- }
- }
}