You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "liuhong (Jira)" <ji...@apache.org> on 2021/08/03 13:09:00 UTC

[jira] liuhong shared "FLINK-23603: 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException" with you

    liuhong shared an issue with you
    ---------------------------------

    

> 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23603
>                 URL: https://issues.apache.org/jira/browse/FLINK-23603
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.13.1
>         Environment:  
> {code:java}
> pom.xml
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-java</artifactId>
>     <version>1.13.1</version>
>     <scope>provided</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-streaming-java_2.12</artifactId>
>     <version>1.13.1</version>
>     <scope>provided</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-clients_2.12</artifactId>
>     <version>1.13.1</version>
>     <scope>provided</scope>
> </dependency>
> <dependency>
>      <groupId>org.apache.flink</groupId>
>      <artifactId>flink-table-planner-blink_2.12</artifactId>
>      <version>1.13.1</version>
>      <scope>provided</scope>
>  </dependency>
>  <dependency>
>      <groupId>org.apache.flink</groupId>
>      <artifactId>flink-streaming-scala_2.12</artifactId>
>      <version>1.13.1</version>
>      <scope>provided</scope>
>  </dependency>
> {code}
> {code:java}
> import com.atguigu.chapter05.bean.Water1;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class Flink08_Time_ProcessingTime_DDL {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>         tEnv.executeSql("create table sensor(" +
>                 "id string," +
>                 "ts bigint," +
>                 "vc int" +
>                 //"pt as proctime()" +
>                 ") with (" +
>                 " 'connector' = 'filesystem' ," +
>                 " 'path' = 'input/water.txt' ," +
>                 " 'format' = 'csv' " +
>                 ")");
>         //tEnv.sqlQuery("select * from sensor").execute().print();
>         //Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor");
>         Table t1 = tEnv.from("sensor");
>         Table t2 = t1.select($("id"), $("ts"),$("vc").as("height"));
>         /*t2.execute().print();
>         t2.printSchema();*/
>         tEnv.toAppendStream(t2, Water1.class).print();
>         env.execute();
>     }
> }
> {code}
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public class Water1 {
>     private String id;
>     private Long ts;
>     private Integer height;
> }
> {code}
> {panel:title=water.txt}
> sensor_1,1,1
> sensor_1,2,2
> sensor_2,3,45
> sensor_1,4,4
> sensor_2,6,9
> sensor_1,7,6
> sensor_3,8,7
> {panel}
>  
>  
>  
>            Reporter: liuhong
>            Priority: Major
>
> 当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改
> Table t2 = t1.select($("id"), $("ts"),{color:#de350b}$("vc").as("height")){color};为
> Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color}, $("ts"));则正常输出结果
> Exception in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vcException in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vc at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79) at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501) at com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36)




--
This message was sent by Atlassian Jira
(v8.3.4#803005)