You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:17 UTC

[flink] 05/05: [FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b90b16b1d2d4dc48a0edfd12844ab1ada871586
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:12:53 2019 +0200

    [FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision
---
 .../runtime/batch/sql/TableSourceITCase.scala      | 28 ++++++-
 .../runtime/batch/table/TableSinkITCase.scala      | 88 ++++++++++++++++++++++
 .../runtime/stream/sql/TableSourceITCase.scala     | 69 ++++++++++++++++-
 .../runtime/stream/table/TableSinkITCase.scala     | 33 +++++++-
 .../planner/utils/MemoryTableSourceSinkUtil.scala  | 31 ++++++++
 .../table/planner/utils/testTableSources.scala     | 35 +++++++++
 6 files changed, 281 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 66ec0a1..dcff996 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.types.Row
 
@@ -221,4 +221,30 @@ class TableSourceITCase extends BatchTestBase {
         row(3, "Hello world"))
     )
   }
+
+  @Test
+  def testDecimalSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c", "d"),
+      Array(
+        DataTypes.INT(),
+        DataTypes.DECIMAL(5, 2),
+        DataTypes.VARCHAR(5),
+        DataTypes.CHAR(5))).build()
+    val tableSource = new TestDataTypeTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1", "1"),
+        row(2, new java.math.BigDecimal(6.1), "12", "12"),
+        row(3, new java.math.BigDecimal(7.1), "123", "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+    checkResult(
+      "SELECT a, b, c, d FROM MyInputFormatTable",
+      Seq(
+        row(1, "5.10", "1", "1"),
+        row(2, "6.10", "12", "12"),
+        row(3, "7.10", "123", "123"))
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
new file mode 100644
index 0000000..75b3395
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.table
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{DataTypes, TableSchema}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class TableSinkITCase extends BatchTestBase {
+
+  @Test
+  def testDecimalOutputFormatTableSink(): Unit = {
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeOutputFormatTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+    tEnv.scan("Table3")
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
+
+  @Test
+  def testDecimalAppendStreamTableSink(): Unit = {
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeAppendStreamTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+    tEnv.scan("Table3")
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index b1faaad..4cf4969 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestStreamTableSource, TestTableSources}
 import org.apache.flink.types.Row
 
 import org.junit.Assert._
@@ -402,4 +403,70 @@ class TableSourceITCase extends StreamingTestBase {
     )
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testDecimalSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c", "d"),
+      Array(
+        DataTypes.INT(),
+        DataTypes.DECIMAL(5, 2),
+        DataTypes.VARCHAR(5),
+        DataTypes.CHAR(5))).build()
+    val tableSource = new TestDataTypeTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1", "1"),
+        row(2, new java.math.BigDecimal(6.1), "12", "12"),
+        row(3, new java.math.BigDecimal(7.1), "123", "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, b, c, d FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,5.10,1,1",
+      "2,6.10,12,12",
+      "3,7.10,123,123"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  /**
+    * StreamTableSource must use type info in DataStream, so it will loose precision.
+    * Just support default precision decimal.
+    */
+  @Test
+  def testLegacyDecimalSourceUsingStreamTableSource(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("a", "b", "c"),
+      Array(
+        Types.INT(),
+        Types.DECIMAL(),
+        Types.STRING()
+      ))
+    val tableSource = new TestStreamTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1"),
+        row(2, new java.math.BigDecimal(6.1), "12"),
+        row(3, new java.math.BigDecimal(7.1), "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, b, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,5.099999999999999645,1",
+      "2,6.099999999999999645,12",
+      "3,7.099999999999999645,123"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c013308..c889149 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -23,13 +23,15 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, Tumble, Types}
 import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
 import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil}
 import org.apache.flink.table.sinks._
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
+
 import org.junit.Assert._
 import org.junit.Test
 
@@ -550,4 +552,33 @@ class TableSinkITCase extends AbstractTestBase {
 
     r.toRetractStream[Row]
   }
+
+  @Test
+  def testDecimalAppendStreamTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeAppendStreamTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    env.fromCollection(tupleData3)
+        .toTable(tEnv, 'a, 'b, 'c)
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
index e64dd21..0d263f6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.sinks.{AppendStreamTableSink, OutputFormatTableSink, TableSink, TableSinkBase}
 import org.apache.flink.table.sources._
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.util.TableConnectorUtil
 import org.apache.flink.types.Row
 
@@ -163,4 +164,34 @@ object MemoryTableSourceSinkUtil {
 
     override def close(): Unit = {}
   }
+
+  final class DataTypeOutputFormatTableSink(
+      schema: TableSchema) extends OutputFormatTableSink[Row] {
+
+    override def getConsumedDataType: DataType = schema.toRowDataType
+
+    override def getOutputFormat: OutputFormat[Row] = new MemoryCollectionOutputFormat
+
+    override def getTableSchema: TableSchema = schema
+
+    override def configure(
+        fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+  }
+
+  final class DataTypeAppendStreamTableSink(
+      schema: TableSchema) extends AppendStreamTableSink[Row] {
+
+    override def getConsumedDataType: DataType = schema.toRowDataType
+
+    override def getTableSchema: TableSchema = schema
+
+    override def configure(
+        fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+    override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+      dataStream.writeUsingOutputFormat(new MemoryCollectionOutputFormat)
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = ???
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index d2dcb1f..44cb4eb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.utils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.io.InputSplit
@@ -33,6 +34,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
@@ -620,3 +622,36 @@ class TestInputFormatTableSource[T](
 
   override def getTableSchema: TableSchema = tableSchema
 }
+
+class TestDataTypeTableSource(
+    tableSchema: TableSchema,
+    values: Seq[Row]) extends InputFormatTableSource[Row] {
+
+  override def getInputFormat: InputFormat[Row, _ <: InputSplit] = {
+    new CollectionInputFormat[Row](
+      values.asJava,
+      fromDataTypeToTypeInfo(getProducedDataType)
+          .createSerializer(new ExecutionConfig)
+          .asInstanceOf[TypeSerializer[Row]])
+  }
+
+  override def getReturnType: TypeInformation[Row] =
+    throw new RuntimeException("Should not invoke this deprecated method.")
+
+  override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+  override def getTableSchema: TableSchema = tableSchema
+}
+
+class TestStreamTableSource(
+    tableSchema: TableSchema,
+    values: Seq[Row]) extends StreamTableSource[Row] {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    execEnv.fromCollection(values, tableSchema.toRowType)
+  }
+
+  override def getReturnType: TypeInformation[Row] = tableSchema.toRowType
+
+  override def getTableSchema: TableSchema = tableSchema
+}