You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangjing (JIRA)" <ji...@apache.org> on 2017/02/14 01:47:41 UTC

[jira] [Closed] (FLINK-5737) Fix the bug when TableSource contains a field of byte[] type

     [ https://issues.apache.org/jira/browse/FLINK-5737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhangjing closed FLINK-5737.
----------------------------
    Resolution: Fixed

> Fix the bug when TableSource contains a field of byte[] type
> ------------------------------------------------------------
>
>                 Key: FLINK-5737
>                 URL: https://issues.apache.org/jira/browse/FLINK-5737
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> At current, if a TableSource contains a field of byte[] type, TableException would be thrown when optimize RelNode tree.
> If we run the following code, logBlockTableSource contain one field: f0,  which is  byte[] type.
>  {code}
> 		tableEnv.registerTableSource("t1", logBlockTableSource);
> 		tableEnv.registerFunction("parse", new BinaryParser());
> 		Table ttDatas = tableEnv.sql("select parse(f0) from t1");
> 		DataStream<String> result = tableEnv.toDataStream(ttDatas, String.class);
> 		result.addSink(new PrintSinkFunction<String>());
> 		env.execute();
> 	public static class BinaryParser extends ScalarFunction {
> 		public String eval(byte[] bytes) {
> 			return new String(bytes);
> 		}
> 	}
>  {code}
> we would get the following exception:
> {code}
> Exception in thread "main" java.lang.AssertionError: Internal error: Error occurred while applying rule StreamTableSourceScanRule
> 	at org.apache.calcite.util.Util.newInternal(Util.java:792)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
> 	at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
> 	at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
> 	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
> 	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264)
> 	at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231)
> 	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259)
> 	at org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148)
> 	at com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: org.apache.flink.table.api.TableException: Unsupported data type encountered: ARRAY
> 	at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51)
> 	at org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124)
> 	at org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108)
> 	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> 	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> 	at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)
> 	at org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108)
> 	at org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37)
> 	at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46)
> 	at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> 	at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
> 	at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
> 	at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128)
> 	at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)
> 	at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136)
> {code}
>   this is because all PrimitiveArrayTypeInfo typeInformation (including byte[], boolean[], short[], int[],  long[], float[], double[], char[]) would transfer to ArrayRelDataType, but estimateRowSize method in FlinkRel class does not support Array SqlType field.
> Solution:
> Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY SqlType rather than ArrayRelDataType sqlType. And when estimate  BINARY, VARBINARY field size, we could give the estimate value just like the VARCHAR value. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)