You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 邓成刚【qq】 <bn...@qq.com> on 2019/03/25 09:04:12 UTC

flink-question is about two streams(from kafka) union all then distinct

Hi, everyone. I have a question. I would like to ask you a question.
 
Problem Description:
     I have two tables (streams from kafka),Both tables define rowTime attributes in EVENTTIME
 
     table1(EVENTTIME,NEW_EVENT_ID,F4,F6)
     table2(EVENTTIME,NEW_EVENT_ID,F2,F3) 
 
now,I would like to use UNION ALL for two streams and distinct them as follows:
 
Table id_distinct = tableEnv.sqlQuery("select distinct EVENTTIME,NEW_EVENT_IDfrom (select EVENTTIME,NEW_EVENT_ID FROM table1 
                                                                union all select EVENTTIME,NEW_EVENT_ID FROM table2)");
 
Question: It will report the following exception. How can I fix this problem? Thank you!
 
Exception in thread "main" java.lang.AssertionError: Type mismatch:
rowtype of new rel:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
rowtype of set:
RecordType(TIMESTAMP(3) NOT NULL EVENTTIME, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NEW_EVENT_ID) NOT NULL
at org.apache.calcite.util.Litmus$1.fail(Litmus.java:31)
at org.apache.calcite.plan.RelOptUtil.equal(RelOptUtil.java:1857)
at org.apache.calcite.plan.volcano.RelSubset.add(RelSubset.java:276)
at org.apache.calcite.plan.volcano.RelSet.add(RelSet.java:148)
at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1633)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
at org.apache.calcite.rel.rules.ProjectSetOpTransposeRule.onMatch(ProjectSetOpTransposeRule.java:109)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)
at com.nsn.flink.service.DealRegisterFile12.main(DealRegisterFile12.java:80)
 
 
deng