You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/24 09:30:33 UTC

[GitHub] [flink] wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361116539
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##########
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal, toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-    * Checks if the given [[CatalogSinkModifyOperation]]'s query can be written to
-    * the given [[TableSink]]. It checks if the names & the field types match. If the table
-    * sink is a [[PartitionableTableSink]], also check that the partitions are valid.
+    * Checks if the given query can be written into the given sink. It checks the field types
+    * should be compatible (types should equal including precisions). If types are not compatible,
+    * but can be implicitly casted, a cast projection will be applied. Otherwise, an exception will
+    * be thrown.
+    *
+    * @param query the query to be checked
+    * @param sinkSchema the schema of sink to be checked
+    * @param typeFactory type factory
+    * @return the query RelNode which may be applied the implicitly cast projection.
+    */
+  def validateSchemaAndApplyImplicitCast(
+      query: RelNode,
+      sinkSchema: TableSchema,
+      typeFactory: FlinkTypeFactory,
+      sinkIdentifier: Option[String] = None): RelNode = {
+
+    val queryLogicalType = DataTypeUtils
+      // convert type to nullable, because we ignore nullability when writing query into sink
 
 Review comment:
   It is equivalent no matter `toNullable` or not, because if the nullability is not the same, we will add a implicit cast. The purpose to ignore nullable at earlier stage is in order to avoid cast project as much as possible, otherwise, we have to update a lot of XML plan files. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services