You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "liuhong (Jira)" <ji...@apache.org> on 2021/08/03 13:00:00 UTC

[jira] [Created] (FLINK-23603) 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException

liuhong created FLINK-23603:
-------------------------------

             Summary: 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
                 Key: FLINK-23603
                 URL: https://issues.apache.org/jira/browse/FLINK-23603
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.1
         Environment:  
{code:java}
pom.xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-table-planner-blink_2.12</artifactId>
     <version>1.13.1</version>
     <scope>provided</scope>
 </dependency>
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_2.12</artifactId>
     <version>1.13.1</version>
     <scope>provided</scope>
 </dependency>
{code}
{code:java}
import com.atguigu.chapter05.bean.Water1;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class Flink08_Time_ProcessingTime_DDL {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int" +
                //"pt as proctime()" +
                ") with (" +
                " 'connector' = 'filesystem' ," +
                " 'path' = 'input/water.txt' ," +
                " 'format' = 'csv' " +
                ")");
        //tEnv.sqlQuery("select * from sensor").execute().print();
        //Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor");
        Table t1 = tEnv.from("sensor");

        Table t2 = t1.select($("id"), $("ts"),$("vc").as("height"));
        /*t2.execute().print();
        t2.printSchema();*/
        tEnv.toAppendStream(t2, Water1.class).print();
        env.execute();

    }
}
{code}
{code:java}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Water1 {

    private String id;
    private Long ts;
    private Integer height;
}
{code}
{panel:title=water.txt}
sensor_1,1,1
sensor_1,2,2
sensor_2,3,45
sensor_1,4,4
sensor_2,6,9
sensor_1,7,6
sensor_3,8,7
{panel}
 

 

 
            Reporter: liuhong


当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改

Table t2 = t1.select($("id"), $("ts"),{color:#de350b}$("vc").as("height")){color};为

Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color}, $("ts"));则正常输出结果

Exception in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vcException in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vc at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79) at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501) at com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)