You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "liuhong (Jira)" <ji...@apache.org> on 2021/08/03 13:00:00 UTC
[jira] [Created] (FLINK-23603) 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
liuhong created FLINK-23603:
-------------------------------
Summary: 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException
Key: FLINK-23603
URL: https://issues.apache.org/jira/browse/FLINK-23603
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.13.1
Environment:
{code:java}
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.1</version>
<scope>provided</scope>
</dependency>
{code}
{code:java}
import com.atguigu.chapter05.bean.Water1;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class Flink08_Time_ProcessingTime_DDL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
"id string," +
"ts bigint," +
"vc int" +
//"pt as proctime()" +
") with (" +
" 'connector' = 'filesystem' ," +
" 'path' = 'input/water.txt' ," +
" 'format' = 'csv' " +
")");
//tEnv.sqlQuery("select * from sensor").execute().print();
//Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor");
Table t1 = tEnv.from("sensor");
Table t2 = t1.select($("id"), $("ts"),$("vc").as("height"));
/*t2.execute().print();
t2.printSchema();*/
tEnv.toAppendStream(t2, Water1.class).print();
env.execute();
}
}
{code}
{code:java}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Water1 {
private String id;
private Long ts;
private Integer height;
}
{code}
{panel:title=water.txt}
sensor_1,1,1
sensor_1,2,2
sensor_2,3,45
sensor_1,4,4
sensor_2,6,9
sensor_1,7,6
sensor_3,8,7
{panel}
Reporter: liuhong
当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改
Table t2 = t1.select($("id"), $("ts"),{color:#de350b}$("vc").as("height")){color};为
Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color}, $("ts"));则正常输出结果
Exception in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vcException in thread "main" org.apache.flink.table.api.TableException: height is not found in id, ts, vc at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79) at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501) at com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)