You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "liuhong (Jira)" <ji...@apache.org> on 2021/08/03 13:09:00 UTC
[jira] liuhong shared "FLINK-23603: 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException" with you
liuhong shared an issue with you
---------------------------------
> 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
> ------------------------------------------------------------------------------
>
> Key: FLINK-23603
> URL: https://issues.apache.org/jira/browse/FLINK-23603
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.1
> Environment:
> {code:java}
> pom.xml
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_2.12</artifactId>
> <version>1.13.1</version>
> <scope>provided</scope>
> </dependency>
> {code}
> {code:java}
> import com.atguigu.chapter05.bean.Water1;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class Flink08_Time_ProcessingTime_DDL {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.executeSql("create table sensor(" +
> "id string," +
> "ts bigint," +
> "vc int" +
> //"pt as proctime()" +
> ") with (" +
> " 'connector' = 'filesystem' ," +
> " 'path' = 'input/water.txt' ," +
> " 'format' = 'csv' " +
> ")");
> //tEnv.sqlQuery("select * from sensor").execute().print();
> //Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor");
> Table t1 = tEnv.from("sensor");
> Table t2 = t1.select($("id"), $("ts"),$("vc").as("height"));
> /*t2.execute().print();
> t2.printSchema();*/
> tEnv.toAppendStream(t2, Water1.class).print();
> env.execute();
> }
> }
> {code}
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public class Water1 {
> private String id;
> private Long ts;
> private Integer height;
> }
> {code}
> {panel:title=water.txt}
> sensor_1,1,1
> sensor_1,2,2
> sensor_2,3,45
> sensor_1,4,4
> sensor_2,6,9
> sensor_1,7,6
> sensor_3,8,7
> {panel}
>
>
>
> Reporter: liuhong
> Priority: Major
>
> 当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改
> Table t2 = t1.select($("id"), $("ts"),{color:#de350b}$("vc").as("height")){color};为
> Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color}, $("ts"));则正常输出结果
> Exception in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vcException in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vc at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79) at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501) at com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)