You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hehuiyuan (Jira)" <ji...@apache.org> on 2020/12/30 07:30:00 UTC

[jira] [Commented] (FLINK-20301) Flink sql 1.10 : Legacy Decimal and decimal for Array that is not Compatible

    [ https://issues.apache.org/jira/browse/FLINK-20301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256349#comment-17256349 ] 

hehuiyuan commented on FLINK-20301:
-----------------------------------

Array<DECIMAL>  & Array<LEGACY(DECIMAL)> is not compatible.

> Flink sql 1.10 : Legacy Decimal and decimal  for Array  that is not Compatible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-20301
>                 URL: https://issues.apache.org/jira/browse/FLINK-20301
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2020-11-23-23-48-02-102.png, image-2020-12-24-18-45-26-403.png
>
>
> The error log:
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource return type.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> {code}
>  
>  
>  
> Background :
> Flink SQL  — blink ---1.10
> The shema for TableSource is JSON:
>  
> {code:java}
>  "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array', items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format: 'date-time'},timehour:{type: 'string', format: 'time'} }}"
> {code}
>  
> The validate throw exception :
> Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource return type.
> The type `Array[Decimal]`  should be considered.
> !image-2020-11-23-23-48-02-102.png|width=594,height=364!
>  
> I think the `visit ( ArrayType source)` should be considered.
> {code:java}
> @Override
> public Boolean visit(ArrayType sourceType1){
>    if (sourceType1 instanceof ArrayType
>       && sourceType1.getElementType() instanceof LegacyTypeInformationType
>       && sourceType1.getElementType().getTypeRoot() == LogicalTypeRoot.DECIMAL) {
>       if (!(targetType instanceof ArrayType && ((ArrayType) targetType).getElementType() instanceof DecimalType)) {
>          return false;
>       }
>       DecimalType logicalDecimalType = (DecimalType) ((ArrayType) targetType).getElementType();
>       if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
>          logicalDecimalType.getScale() != 18) {
>          throw new ValidationException(
>             "Legacy decimal type can only be mapped to DECIMAL(38, 18).");
>       }
>    }
>    return true;
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)