You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:12 UTC

[flink] branch master updated (8b3430b -> 5b90b16)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8b3430b  [FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait
     new 49202e2  [FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation
     new ebf18db  [FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision
     new 9dddae8  [FLINK-13495][table-planner-blink] Introduce isAssignable to use soft check in TableSourceUtil
     new e2a8b95  [FLINK-13495][table-planner-blink] source/sink code should use planner type conversion to keep precision
     new 5b90b16  [FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/catalog/ConnectorCatalogTable.java | 22 ++++--
 .../nodes/physical/PhysicalTableSourceScan.scala   | 18 ++++-
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  5 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  6 +-
 .../nodes/physical/stream/StreamExecSink.scala     |  5 +-
 .../stream/StreamExecTableSourceScan.scala         |  6 +-
 .../table/planner/sources/TableSourceUtil.scala    |  9 ++-
 .../runtime/batch/sql/TableSourceITCase.scala      | 28 ++++++-
 .../runtime/batch/table/TableSinkITCase.scala      | 88 ++++++++++++++++++++++
 .../runtime/stream/sql/TableSourceITCase.scala     | 69 ++++++++++++++++-
 .../runtime/stream/table/TableSinkITCase.scala     | 33 +++++++-
 .../planner/utils/MemoryTableSourceSinkUtil.scala  | 31 ++++++++
 .../table/planner/utils/testTableSources.scala     | 35 +++++++++
 .../table/dataformat/DataFormatConverters.java     |  4 +-
 .../table/runtime/types/PlannerTypeUtils.java      | 44 +++++++++++
 .../runtime/types/LogicalTypeAssignableTest.java}  | 45 +++++------
 16 files changed, 395 insertions(+), 53 deletions(-)
 create mode 100644 flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
 copy flink-table/{flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCompatibleCheckTest.java => flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java} (85%)


[flink] 03/05: [FLINK-13495][table-planner-blink] Introduce isAssignable to use soft check in TableSourceUtil

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dddae827e3f117b3527254701119c10909a3ac5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:06:58 2019 +0200

    [FLINK-13495][table-planner-blink] Introduce isAssignable to use soft check in TableSourceUtil
---
 .../table/planner/sources/TableSourceUtil.scala    |   3 +-
 .../table/runtime/types/PlannerTypeUtils.java      |  44 +++
 .../runtime/types/LogicalTypeAssignableTest.java   | 301 +++++++++++++++++++++
 3 files changed, 347 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 2f15be2..bbbc06d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.RexNodeConverter
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
+import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.sources.{DefinedFieldMapping, DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, TableSource}
 import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, TimestampType, TinyIntType}
@@ -116,7 +117,7 @@ object TableSourceUtil {
 
         val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
         // validate that mapped fields are are same type
-        if (fromTypeInfoToLogicalType(tpe) != t) {
+        if (!isAssignable(fromTypeInfoToLogicalType(tpe), t)) {
           throw new ValidationException(s"Type $t of table field '$name' does not " +
             s"match with type $tpe of the field '$physicalName' of the TableSource return type.")
         }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
index bdb1134..77d346b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
@@ -110,4 +110,48 @@ public class PlannerTypeUtils {
 				return t1.copy(true).equals(t2.copy(true));
 		}
 	}
+
+	/**
+	 * Now in the conversion to the TypeInformation from DataType, type may loose some information
+	 * about nullable and precision. So we add this method to do a soft check.
+	 *
+	 * <p>The difference of {@link #isInteroperable} is ignore decimal precision.
+	 *
+	 * <p>Now not ignore timestamp precision, because we only support one precision for timestamp type now.
+	 */
+	public static boolean isAssignable(LogicalType t1, LogicalType t2) {
+		// Soft check for CharType, it is converted to String TypeInformation and loose char information.
+		if (t1.getTypeRoot().getFamilies().contains(CHARACTER_STRING) &&
+				t2.getTypeRoot().getFamilies().contains(CHARACTER_STRING)) {
+			return true;
+		}
+		if (t1.getTypeRoot().getFamilies().contains(BINARY_STRING) &&
+				t2.getTypeRoot().getFamilies().contains(BINARY_STRING)) {
+			return true;
+		}
+		if (t1.getTypeRoot() != t2.getTypeRoot()) {
+			return false;
+		}
+
+		switch (t1.getTypeRoot()) {
+			case DECIMAL:
+				return true;
+			default:
+				if (t1.getChildren().isEmpty()) {
+					return t1.copy(true).equals(t2.copy(true));
+				} else {
+					List<LogicalType> children1 = t1.getChildren();
+					List<LogicalType> children2 = t2.getChildren();
+					if (children1.size() != children2.size()) {
+						return false;
+					}
+					for (int i = 0; i < children1.size(); i++) {
+						if (!isAssignable(children1.get(i), children2.get(i))) {
+							return false;
+						}
+					}
+					return true;
+				}
+		}
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
new file mode 100644
index 0000000..af67f51
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link PlannerTypeUtils#isAssignable(LogicalType, LogicalType)}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeAssignableTest {
+
+	@Parameters(name = "{index}: [{0} COMPATIBLE {1} => {2}")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+				{new CharType(), new CharType(5), true},
+
+				{new CharType(), new VarCharType(5), true},
+
+				{new VarCharType(), new VarCharType(33), true},
+
+				{new BooleanType(), new BooleanType(false), true},
+
+				{new BinaryType(), new BinaryType(22), true},
+
+				{new VarBinaryType(), new VarBinaryType(44), true},
+
+				{new DecimalType(), new DecimalType(10, 2), true},
+
+				{new TinyIntType(), new TinyIntType(false), true},
+
+				{new SmallIntType(), new SmallIntType(false), true},
+
+				{new IntType(), new IntType(false), true},
+
+				{new BigIntType(), new BigIntType(false), true},
+
+				{new FloatType(), new FloatType(false), true},
+
+				{new DoubleType(), new DoubleType(false), true},
+
+				{new DateType(), new DateType(false), true},
+
+				{new TimeType(), new TimeType(9), false},
+
+				{new TimestampType(9), new TimestampType(3), false},
+
+				{new ZonedTimestampType(9), new ZonedTimestampType(3), false},
+
+				{new ZonedTimestampType(false, TimestampKind.PROCTIME, 9), new ZonedTimestampType(3), false},
+
+				{
+					new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 2),
+					new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.MONTH),
+					false
+				},
+
+				{
+					new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
+					new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
+					false
+				},
+
+				{
+					new ArrayType(new TimestampType()),
+					new ArrayType(new SmallIntType()),
+					false,
+				},
+
+				{
+					new MultisetType(new TimestampType()),
+					new MultisetType(new SmallIntType()),
+					false
+				},
+
+				{
+					new MapType(new VarCharType(20), new TimestampType()),
+					new MapType(new VarCharType(99), new TimestampType()),
+					true
+				},
+
+				{
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("a", new VarCharType()),
+							new RowType.RowField("b", new VarCharType()),
+							new RowType.RowField("c", new VarCharType()),
+							new RowType.RowField("d", new TimestampType()))),
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("_a", new VarCharType()),
+							new RowType.RowField("_b", new VarCharType()),
+							new RowType.RowField("_c", new VarCharType()),
+							new RowType.RowField("_d", new TimestampType()))),
+					// field name doesn't matter
+					true
+				},
+
+				{
+					new RowType(
+						Arrays.asList(
+							new RowField("f1", new IntType()),
+							new RowField("f2", new VarCharType())
+						)
+					),
+					new RowType(
+						Arrays.asList(
+							new RowField("f1", new IntType()),
+							new RowField("f2", new BooleanType())
+						)
+					),
+					false
+				},
+
+				{
+					new ArrayType(
+						new RowType(
+							Arrays.asList(
+								new RowField("f1", new IntType()),
+								new RowField("f2", new IntType())
+							)
+						)
+					),
+					new ArrayType(
+						new RowType(
+							Arrays.asList(
+								new RowField("f3", new IntType()),
+								new RowField("f4", new IntType())
+							)
+						)
+					),
+					true
+				},
+
+				{
+					new MapType(
+						new IntType(),
+						new RowType(
+							Arrays.asList(
+								new RowField("f1", new IntType()),
+								new RowField("f2", new IntType())
+							)
+						)
+					),
+					new MapType(
+						new IntType(),
+						new RowType(
+							Arrays.asList(
+								new RowField("f3", new IntType()),
+								new RowField("f4", new IntType())
+							)
+						)
+					),
+					true
+				},
+
+				{
+					new MultisetType(
+						new RowType(
+							Arrays.asList(
+								new RowField("f1", new IntType()),
+								new RowField("f2", new IntType())
+							)
+						)
+					),
+					new MultisetType(
+						new RowType(
+							Arrays.asList(
+								new RowField("f1", new IntType()),
+								new RowField("f2", new IntType())
+							)
+						)
+					),
+					true
+				},
+
+				{
+					new TypeInformationAnyType<>(Types.GENERIC(PlannerTypeUtils.class)),
+					new TypeInformationAnyType<>(Types.GENERIC(Object.class)),
+					false
+				},
+
+				{
+					createUserType(new IntType(), new VarCharType()),
+					createUserType(new IntType(), new VarCharType()),
+					true
+				},
+
+				{
+					createDistinctType(new DecimalType(10, 2)),
+					createDistinctType(new DecimalType(10, 2)),
+					true
+				}
+			}
+		);
+	}
+
+	@Parameter
+	public LogicalType sourceType;
+
+	@Parameter(1)
+	public LogicalType targetType;
+
+	@Parameter(2)
+	public boolean equals;
+
+	@Test
+	public void testAreTypesCompatible() {
+		assertThat(
+			PlannerTypeUtils.isAssignable(sourceType, targetType),
+			equalTo(equals));
+		assertTrue(PlannerTypeUtils.isAssignable(sourceType, sourceType.copy()));
+		assertTrue(PlannerTypeUtils.isAssignable(targetType, targetType.copy()));
+	}
+
+	private static DistinctType createDistinctType(LogicalType sourceType) {
+		return new DistinctType.Builder(
+			ObjectIdentifier.of("cat", "db", UUID.randomUUID().toString()),
+			sourceType)
+			.setDescription("Money type desc.")
+			.build();
+	}
+
+	private static StructuredType createUserType(LogicalType... children) {
+		return new StructuredType.Builder(
+			ObjectIdentifier.of("cat", "db", "User"),
+			Arrays.stream(children).map(lt ->
+				new StructuredType.StructuredAttribute(UUID.randomUUID().toString(), lt))
+				.collect(Collectors.toList()))
+			.setDescription("User type desc.")
+			.setFinal(true)
+			.setInstantiable(true)
+			.setImplementationClass(User.class)
+			.build();
+	}
+
+	private static final class User {
+		public int setting;
+	}
+}


[flink] 02/05: [FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebf18db29b9b8480459e20b307e07980657e75e5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:05:14 2019 +0200

    [FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision
---
 .../plan/nodes/physical/PhysicalTableSourceScan.scala  | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
index aa85e2c..50fa428 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
@@ -18,11 +18,15 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical
 
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.sources.{InputFormatTableSource, StreamTableSource, TableSource}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
@@ -60,8 +64,16 @@ abstract class PhysicalTableSourceScan(
   def getSourceTransformation(
       streamEnv: StreamExecutionEnvironment): Transformation[_] = {
     if (sourceTransform == null) {
-      sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
-        getDataStream(streamEnv).getTransformation
+      sourceTransform = tableSource match {
+        case format: InputFormatTableSource[_] =>
+          // we don't use InputFormatTableSource.getDataStream, because in here we use planner
+          // type conversion to support precision of Varchar and something else.
+          streamEnv.createInput(
+            format.getInputFormat.asInstanceOf[InputFormat[Any, _ <: InputSplit]],
+            fromDataTypeToTypeInfo(format.getProducedDataType).asInstanceOf[TypeInformation[Any]]
+          ).name(format.explainSource()).getTransformation
+        case s: StreamTableSource[_] => s.getDataStream(streamEnv).getTransformation
+      }
     }
     sourceTransform
   }


[flink] 04/05: [FLINK-13495][table-planner-blink] source/sink code should use planner type conversion to keep precision

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2a8b95e4703e77a963cee5641bffe2e5b8a6f49
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:09:06 2019 +0200

    [FLINK-13495][table-planner-blink] source/sink code should use planner type conversion to keep precision
---
 .../table/planner/plan/nodes/physical/batch/BatchExecSink.scala     | 5 +++--
 .../plan/nodes/physical/batch/BatchExecTableSourceScan.scala        | 6 +++---
 .../table/planner/plan/nodes/physical/stream/StreamExecSink.scala   | 5 ++---
 .../plan/nodes/physical/stream/StreamExecTableSourceScan.scala      | 6 +++---
 .../org/apache/flink/table/planner/sources/TableSourceUtil.scala    | 6 +++---
 .../org/apache/flink/table/dataformat/DataFormatConverters.java     | 4 ++--
 6 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index 2b4e181..a64eac1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -30,7 +30,8 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
-import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.runtime.types.{ClassLogicalTypeConverter, TypeInfoDataTypeConverter}
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.sinks.{RetractStreamTableSink, StreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.types.DataType
@@ -118,7 +119,7 @@ class BatchExecSink[T](
       planner: BatchPlanner): Transformation[T] = {
     val config = planner.getTableConfig
     val resultDataType = sink.getConsumedDataType
-    val resultType = fromDataTypeToLegacyInfo(resultDataType)
+    val resultType = fromDataTypeToTypeInfo(resultDataType)
     validateType(resultDataType)
     val inputNode = getInputNodes.get(0)
     inputNode match {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 99f1b7c..5ab23d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -29,8 +29,8 @@ import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceSca
 import org.apache.flink.table.planner.plan.schema.FlinkRelOptTable
 import org.apache.flink.table.planner.plan.utils.ScanUtil
 import org.apache.flink.table.planner.sources.TableSourceUtil
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
@@ -90,11 +90,11 @@ class BatchExecTableSourceScan(
       isStreamTable = false,
       tableSourceTable.selectedFields)
 
-    val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
+    val inputDataType = inputTransform.getOutputType
     val producedDataType = tableSource.getProducedDataType
 
     // check that declared and actual type of table source DataStream are identical
-    if (inputDataType != producedDataType) {
+    if (inputDataType != TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
       throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
         s"returned a DataStream of data type $producedDataType that does not match with the " +
         s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " +
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index ea50bfe..9140644 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -30,13 +30,12 @@ import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.`trait`.{AccMode, AccModeTraitDef}
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil
 import org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.runtime.typeutils.{BaseRowTypeInfo, TypeCheckUtils}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.types.logical.TimestampType
-import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
@@ -212,7 +211,7 @@ class StreamExecSink[T](
       parTransformation.getOutputType
     }
     val resultDataType = sink.getConsumedDataType
-    val resultType = fromDataTypeToLegacyInfo(resultDataType)
+    val resultType = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(resultDataType)
     if (CodeGenUtils.isInternalClass(resultDataType)) {
       parTransformation.asInstanceOf[Transformation[T]]
     } else {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index a4376fb..746aad7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -34,9 +34,9 @@ import org.apache.flink.table.planner.plan.schema.FlinkRelOptTable
 import org.apache.flink.table.planner.plan.utils.ScanUtil
 import org.apache.flink.table.planner.sources.TableSourceUtil
 import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
 import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource}
-import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.types.{DataType, FieldsDataType}
 import org.apache.flink.types.Row
 
@@ -102,11 +102,11 @@ class StreamExecTableSourceScan(
       isStreamTable = true,
       tableSourceTable.selectedFields)
 
-    val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
+    val inputDataType = inputTransform.getOutputType
     val producedDataType = tableSource.getProducedDataType
 
     // check that declared and actual type of table source DataStream are identical
-    if (inputDataType != producedDataType) {
+    if (inputDataType != TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
       throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
         s"returned a DataStream of data type $producedDataType that does not match with the " +
         s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " +
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index bbbc06d..489e3e2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -28,10 +28,10 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.RexNodeConverter
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.sources.{DefinedFieldMapping, DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, TableSource}
 import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, TimestampType, TinyIntType}
-import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 import com.google.common.collect.ImmutableList
@@ -123,7 +123,7 @@ object TableSourceUtil {
         }
         idx
     }
-    val inputType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType)
+    val inputType = fromDataTypeToTypeInfo(tableSource.getProducedDataType)
 
     // ensure that only one field is mapped to an atomic type
     if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) {
@@ -328,7 +328,7 @@ object TableSourceUtil {
       fieldName: String,
       tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
 
-    val returnType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType)
+    val returnType = fromDataTypeToTypeInfo(tableSource.getProducedDataType)
 
     /** Look up a field by name in a type information */
     def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index de29de7..b6e6bb2 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -65,7 +65,7 @@ import java.util.stream.Stream;
 
 import scala.Product;
 
-import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
@@ -209,7 +209,7 @@ public class DataFormatConverters {
 						DataTypes.INT().bridgedTo(Integer.class));
 			case ROW:
 			case STRUCTURED_TYPE:
-				CompositeType compositeType = (CompositeType) fromDataTypeToLegacyInfo(dataType);
+				CompositeType compositeType = (CompositeType) fromDataTypeToTypeInfo(dataType);
 				DataType[] fieldTypes = Stream.iterate(0, x -> x + 1).limit(compositeType.getArity())
 						.map((Function<Integer, TypeInformation>) compositeType::getTypeAt)
 						.map(TypeConversions::fromLegacyInfoToDataType).toArray(DataType[]::new);


[flink] 01/05: [FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49202e2cf849dce7df3856ec9f40b1b5596a7ae9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:02:22 2019 +0200

    [FLINK-13495][table-api] Use DataType in ConnectorCatalogTable instead of TypeInformation
---
 .../flink/table/catalog/ConnectorCatalogTable.java | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index a3f7bda..1ea8b6f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -20,14 +20,16 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -121,7 +123,7 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 			return tableSchema;
 		}
 
-		TypeInformation[] types = Arrays.copyOf(tableSchema.getFieldTypes(), tableSchema.getFieldCount());
+		DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
 		String[] fieldNames = tableSchema.getFieldNames();
 		if (source instanceof DefinedRowtimeAttributes) {
 			updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types);
@@ -129,13 +131,13 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 		if (source instanceof DefinedProctimeAttribute) {
 			updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types);
 		}
-		return new TableSchema(fieldNames, types);
+		return TableSchema.builder().fields(fieldNames, types).build();
 	}
 
 	private static void updateRowtimeIndicators(
 			DefinedRowtimeAttributes source,
 			String[] fieldNames,
-			TypeInformation[] types) {
+			DataType[] types) {
 		List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors()
 			.stream()
 			.map(RowtimeAttributeDescriptor::getAttributeName)
@@ -143,7 +145,9 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 
 		for (int i = 0; i < fieldNames.length; i++) {
 			if (rowtimeAttributes.contains(fieldNames[i])) {
-				types[i] = TimeIndicatorTypeInfo.ROWTIME_INDICATOR;
+				// bridged to timestamp for compatible flink-planner
+				types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class);
 			}
 		}
 	}
@@ -151,12 +155,14 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
 	private static void updateProctimeIndicator(
 			DefinedProctimeAttribute source,
 			String[] fieldNames,
-			TypeInformation[] types) {
+			DataType[] types) {
 		String proctimeAttribute = source.getProctimeAttribute();
 
 		for (int i = 0; i < fieldNames.length; i++) {
 			if (fieldNames[i].equals(proctimeAttribute)) {
-				types[i] = TimeIndicatorTypeInfo.PROCTIME_INDICATOR;
+				// bridged to timestamp for compatible flink-planner
+				types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.PROCTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class);
 				break;
 			}
 		}


[flink] 05/05: [FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b90b16b1d2d4dc48a0edfd12844ab1ada871586
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:12:53 2019 +0200

    [FLINK-13495][table-planner-blink] Add table source and table sink it case using varchar/char/decimal precision
---
 .../runtime/batch/sql/TableSourceITCase.scala      | 28 ++++++-
 .../runtime/batch/table/TableSinkITCase.scala      | 88 ++++++++++++++++++++++
 .../runtime/stream/sql/TableSourceITCase.scala     | 69 ++++++++++++++++-
 .../runtime/stream/table/TableSinkITCase.scala     | 33 +++++++-
 .../planner/utils/MemoryTableSourceSinkUtil.scala  | 31 ++++++++
 .../table/planner/utils/testTableSources.scala     | 35 +++++++++
 6 files changed, 281 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 66ec0a1..dcff996 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.types.Row
 
@@ -221,4 +221,30 @@ class TableSourceITCase extends BatchTestBase {
         row(3, "Hello world"))
     )
   }
+
+  @Test
+  def testDecimalSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c", "d"),
+      Array(
+        DataTypes.INT(),
+        DataTypes.DECIMAL(5, 2),
+        DataTypes.VARCHAR(5),
+        DataTypes.CHAR(5))).build()
+    val tableSource = new TestDataTypeTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1", "1"),
+        row(2, new java.math.BigDecimal(6.1), "12", "12"),
+        row(3, new java.math.BigDecimal(7.1), "123", "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+    checkResult(
+      "SELECT a, b, c, d FROM MyInputFormatTable",
+      Seq(
+        row(1, "5.10", "1", "1"),
+        row(2, "6.10", "12", "12"),
+        row(3, "7.10", "123", "123"))
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
new file mode 100644
index 0000000..75b3395
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.table
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{DataTypes, TableSchema}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class TableSinkITCase extends BatchTestBase {
+
+  @Test
+  def testDecimalOutputFormatTableSink(): Unit = {
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeOutputFormatTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+    tEnv.scan("Table3")
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
+
+  @Test
+  def testDecimalAppendStreamTableSink(): Unit = {
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeAppendStreamTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
+
+    tEnv.scan("Table3")
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index b1faaad..4cf4969 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestStreamTableSource, TestTableSources}
 import org.apache.flink.types.Row
 
 import org.junit.Assert._
@@ -402,4 +403,70 @@ class TableSourceITCase extends StreamingTestBase {
     )
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testDecimalSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c", "d"),
+      Array(
+        DataTypes.INT(),
+        DataTypes.DECIMAL(5, 2),
+        DataTypes.VARCHAR(5),
+        DataTypes.CHAR(5))).build()
+    val tableSource = new TestDataTypeTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1", "1"),
+        row(2, new java.math.BigDecimal(6.1), "12", "12"),
+        row(3, new java.math.BigDecimal(7.1), "123", "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, b, c, d FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,5.10,1,1",
+      "2,6.10,12,12",
+      "3,7.10,123,123"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  /**
+    * StreamTableSource must use type info in DataStream, so it will loose precision.
+    * Just support default precision decimal.
+    */
+  @Test
+  def testLegacyDecimalSourceUsingStreamTableSource(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("a", "b", "c"),
+      Array(
+        Types.INT(),
+        Types.DECIMAL(),
+        Types.STRING()
+      ))
+    val tableSource = new TestStreamTableSource(
+      tableSchema,
+      Seq(
+        row(1, new java.math.BigDecimal(5.1), "1"),
+        row(2, new java.math.BigDecimal(6.1), "12"),
+        row(3, new java.math.BigDecimal(7.1), "123")
+      ))
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, b, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,5.099999999999999645,1",
+      "2,6.099999999999999645,12",
+      "3,7.099999999999999645,123"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c013308..c889149 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -23,13 +23,15 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.api.{DataTypes, TableException, TableSchema, Tumble, Types}
 import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
 import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil.{DataTypeAppendStreamTableSink, DataTypeOutputFormatTableSink}
 import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil}
 import org.apache.flink.table.sinks._
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
+
 import org.junit.Assert._
 import org.junit.Test
 
@@ -550,4 +552,33 @@ class TableSinkITCase extends AbstractTestBase {
 
     r.toRetractStream[Row]
   }
+
+  @Test
+  def testDecimalAppendStreamTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+
+    MemoryTableSourceSinkUtil.clear()
+
+    val schema = TableSchema.builder()
+        .field("c", DataTypes.VARCHAR(5))
+        .field("b", DataTypes.DECIMAL(10, 0))
+        .field("d", DataTypes.CHAR(5))
+        .build()
+    val sink = new DataTypeAppendStreamTableSink(schema)
+    tEnv.registerTableSink("testSink", sink)
+
+    env.fromCollection(tupleData3)
+        .toTable(tEnv, 'a, 'b, 'c)
+        .where('a > 20)
+        .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
+        .insertInto("testSink")
+
+    tEnv.execute("")
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq("12345,55,12345").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
index e64dd21..0d263f6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.sinks.{AppendStreamTableSink, OutputFormatTableSink, TableSink, TableSinkBase}
 import org.apache.flink.table.sources._
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.util.TableConnectorUtil
 import org.apache.flink.types.Row
 
@@ -163,4 +164,34 @@ object MemoryTableSourceSinkUtil {
 
     override def close(): Unit = {}
   }
+
+  final class DataTypeOutputFormatTableSink(
+      schema: TableSchema) extends OutputFormatTableSink[Row] {
+
+    override def getConsumedDataType: DataType = schema.toRowDataType
+
+    override def getOutputFormat: OutputFormat[Row] = new MemoryCollectionOutputFormat
+
+    override def getTableSchema: TableSchema = schema
+
+    override def configure(
+        fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+  }
+
+  final class DataTypeAppendStreamTableSink(
+      schema: TableSchema) extends AppendStreamTableSink[Row] {
+
+    override def getConsumedDataType: DataType = schema.toRowDataType
+
+    override def getTableSchema: TableSchema = schema
+
+    override def configure(
+        fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+    override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+      dataStream.writeUsingOutputFormat(new MemoryCollectionOutputFormat)
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = ???
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index d2dcb1f..44cb4eb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.utils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.io.InputSplit
@@ -33,6 +34,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
@@ -620,3 +622,36 @@ class TestInputFormatTableSource[T](
 
   override def getTableSchema: TableSchema = tableSchema
 }
+
+class TestDataTypeTableSource(
+    tableSchema: TableSchema,
+    values: Seq[Row]) extends InputFormatTableSource[Row] {
+
+  override def getInputFormat: InputFormat[Row, _ <: InputSplit] = {
+    new CollectionInputFormat[Row](
+      values.asJava,
+      fromDataTypeToTypeInfo(getProducedDataType)
+          .createSerializer(new ExecutionConfig)
+          .asInstanceOf[TypeSerializer[Row]])
+  }
+
+  override def getReturnType: TypeInformation[Row] =
+    throw new RuntimeException("Should not invoke this deprecated method.")
+
+  override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+  override def getTableSchema: TableSchema = tableSchema
+}
+
+class TestStreamTableSource(
+    tableSchema: TableSchema,
+    values: Seq[Row]) extends StreamTableSource[Row] {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    execEnv.fromCollection(values, tableSchema.toRowType)
+  }
+
+  override def getReturnType: TypeInformation[Row] = tableSchema.toRowType
+
+  override def getTableSchema: TableSchema = tableSchema
+}