You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:17 UTC
[flink] 05/05: [FLINK-13495][table-planner-blink] Add table source
and table sink it case using varchar/char/decimal precision
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b90b16b1d2d4dc48a0edfd12844ab1ada871586
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:12:53 2019 +0200
[FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision
---
.../runtime/batch/sql/TableSourceITCase.scala | 28 ++++++-
.../runtime/batch/table/TableSinkITCase.scala | 88 ++++++++++++++++++++++
.../runtime/stream/sql/TableSourceITCase.scala | 69 ++++++++++++++++-
.../runtime/stream/table/TableSinkITCase.scala | 33 +++++++-
.../planner/utils/MemoryTableSourceSinkUtil.scala | 31 ++++++++
.../table/planner/utils/testTableSources.scala | 35 +++++++++
6 files changed, 281 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 66ec0a1..dcff996 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
import org.apache.flink.types.Row
@@ -221,4 +221,30 @@ class TableSourceITCase extends BatchTestBase {
row(3, "Hello world"))
)
}
+
+ @Test
+ def testDecimalSource(): Unit = {
+ val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c", "d"),
+ Array(
+ DataTypes.INT(),
+ DataTypes.DECIMAL(5, 2),
+ DataTypes.VARCHAR(5),
+ DataTypes.CHAR(5))).build()
+ val tableSource = new TestDataTypeTableSource(
+ tableSchema,
+ Seq(
+ row(1, new java.math.BigDecimal(5.1), "1", "1"),
+ row(2, new java.math.BigDecimal(6.1), "12", "12"),
+ row(3, new java.math.BigDecimal(7.1), "123", "123")
+ ))
+ tEnv.registerTableSource("MyInputFormatTable", tableSource)
+ checkResult(
+ "SELECT a, b, c, d FROM MyInputFormatTable",
+ Seq(
+ row(1, "5.10", "1", "1"),
+ row(2, "6.10", "12", "12"),
+ row(3, "7.10", "123", "123"))
+ )
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
new file mode 100644
index 0000000..75b3395
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.table
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{DataTypes, TableSchema}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class TableSinkITCase extends BatchTestBase {
+
+ @Test
+ def testDecimalOutputFormatTableSink(): Unit = {
+ MemoryTableSourceSinkUtil.clear()
+
+ val schema = TableSchema.builder()
+ .field("c", DataTypes.VARCHAR(5))
+ .field("b", DataTypes.DECIMAL(10, 0))
+ .field("d", DataTypes.CHAR(5))
+ .build()
+ val sink = new DataTypeOutputFormatTableSink(schema)
+ tEnv.registerTableSink("testSink", sink)
+
+ registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+ tEnv.scan("Table3")
+ .where('a > 20)
+ .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+ .insertInto("testSink")
+
+ tEnv.execute("")
+
+ val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+ val expected = Seq("12345,55,12345").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(results, expected)
+ }
+
+ @Test
+ def testDecimalAppendStreamTableSink(): Unit = {
+ MemoryTableSourceSinkUtil.clear()
+
+ val schema = TableSchema.builder()
+ .field("c", DataTypes.VARCHAR(5))
+ .field("b", DataTypes.DECIMAL(10, 0))
+ .field("d", DataTypes.CHAR(5))
+ .build()
+ val sink = new DataTypeAppendStreamTableSink(schema)
+ tEnv.registerTableSink("testSink", sink)
+
+ registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+ tEnv.scan("Table3")
+ .where('a > 20)
+ .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+ .insertInto("testSink")
+
+ tEnv.execute("")
+
+ val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+ val expected = Seq("12345,55,12345").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(results, expected)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index b1faaad..4cf4969 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestStreamTableSource, TestTableSources}
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -402,4 +403,70 @@ class TableSourceITCase extends StreamingTestBase {
)
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+
+ @Test
+ def testDecimalSource(): Unit = {
+ val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c", "d"),
+ Array(
+ DataTypes.INT(),
+ DataTypes.DECIMAL(5, 2),
+ DataTypes.VARCHAR(5),
+ DataTypes.CHAR(5))).build()
+ val tableSource = new TestDataTypeTableSource(
+ tableSchema,
+ Seq(
+ row(1, new java.math.BigDecimal(5.1), "1", "1"),
+ row(2, new java.math.BigDecimal(6.1), "12", "12"),
+ row(3, new java.math.BigDecimal(7.1), "123", "123")
+ ))
+ tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+ val sink = new TestingAppendSink()
+ tEnv.sqlQuery("SELECT a, b, c, d FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "1,5.10,1,1",
+ "2,6.10,12,12",
+ "3,7.10,123,123"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ /**
+ * StreamTableSource must use type info in DataStream, so it will loose precision.
+ * Just support default precision decimal.
+ */
+ @Test
+ def testLegacyDecimalSourceUsingStreamTableSource(): Unit = {
+ val tableSchema = new TableSchema(
+ Array("a", "b", "c"),
+ Array(
+ Types.INT(),
+ Types.DECIMAL(),
+ Types.STRING()
+ ))
+ val tableSource = new TestStreamTableSource(
+ tableSchema,
+ Seq(
+ row(1, new java.math.BigDecimal(5.1), "1"),
+ row(2, new java.math.BigDecimal(6.1), "12"),
+ row(3, new java.math.BigDecimal(7.1), "123")
+ ))
+ tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+ val sink = new TestingAppendSink()
+ tEnv.sqlQuery("SELECT a, b, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "1,5.099999999999999645,1",
+ "2,6.099999999999999645,12",
+ "3,7.099999999999999645,123"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c013308..c889149 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -23,13 +23,15 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, Tumble, Types}
import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil}
import org.apache.flink.table.sinks._
import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
import org.apache.flink.types.Row
+
import org.junit.Assert._
import org.junit.Test
@@ -550,4 +552,33 @@ class TableSinkITCase extends AbstractTestBase {
r.toRetractStream[Row]
}
+
+ @Test
+ def testDecimalAppendStreamTableSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+
+ MemoryTableSourceSinkUtil.clear()
+
+ val schema = TableSchema.builder()
+ .field("c", DataTypes.VARCHAR(5))
+ .field("b", DataTypes.DECIMAL(10, 0))
+ .field("d", DataTypes.CHAR(5))
+ .build()
+ val sink = new DataTypeAppendStreamTableSink(schema)
+ tEnv.registerTableSink("testSink", sink)
+
+ env.fromCollection(tupleData3)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .where('a > 20)
+ .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+ .insertInto("testSink")
+
+ tEnv.execute("")
+
+ val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+ val expected = Seq("12345,55,12345").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(results, expected)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
index e64dd21..0d263f6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.sinks.{AppendStreamTableSink, OutputFormatTableSink, TableSink, TableSinkBase}
import org.apache.flink.table.sources._
+import org.apache.flink.table.types.DataType
import org.apache.flink.table.util.TableConnectorUtil
import org.apache.flink.types.Row
@@ -163,4 +164,34 @@ object MemoryTableSourceSinkUtil {
override def close(): Unit = {}
}
+
+ final class DataTypeOutputFormatTableSink(
+ schema: TableSchema) extends OutputFormatTableSink[Row] {
+
+ override def getConsumedDataType: DataType = schema.toRowDataType
+
+ override def getOutputFormat: OutputFormat[Row] = new MemoryCollectionOutputFormat
+
+ override def getTableSchema: TableSchema = schema
+
+ override def configure(
+ fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+ }
+
+ final class DataTypeAppendStreamTableSink(
+ schema: TableSchema) extends AppendStreamTableSink[Row] {
+
+ override def getConsumedDataType: DataType = schema.toRowDataType
+
+ override def getTableSchema: TableSchema = schema
+
+ override def configure(
+ fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+ override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+ dataStream.writeUsingOutputFormat(new MemoryCollectionOutputFormat)
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = ???
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index d2dcb1f..44cb4eb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.utils
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.core.io.InputSplit
@@ -33,6 +34,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
@@ -620,3 +622,36 @@ class TestInputFormatTableSource[T](
override def getTableSchema: TableSchema = tableSchema
}
+
+class TestDataTypeTableSource(
+ tableSchema: TableSchema,
+ values: Seq[Row]) extends InputFormatTableSource[Row] {
+
+ override def getInputFormat: InputFormat[Row, _ <: InputSplit] = {
+ new CollectionInputFormat[Row](
+ values.asJava,
+ fromDataTypeToTypeInfo(getProducedDataType)
+ .createSerializer(new ExecutionConfig)
+ .asInstanceOf[TypeSerializer[Row]])
+ }
+
+ override def getReturnType: TypeInformation[Row] =
+ throw new RuntimeException("Should not invoke this deprecated method.")
+
+ override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+ override def getTableSchema: TableSchema = tableSchema
+}
+
+class TestStreamTableSource(
+ tableSchema: TableSchema,
+ values: Seq[Row]) extends StreamTableSource[Row] {
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+ execEnv.fromCollection(values, tableSchema.toRowType)
+ }
+
+ override def getReturnType: TypeInformation[Row] = tableSchema.toRowType
+
+ override def getTableSchema: TableSchema = tableSchema
+}