You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mark You (JIRA)" <ji...@apache.org> on 2017/06/07 14:01:18 UTC
[jira] [Closed] (FLINK-6862) Tumble window rowtime not resolve at
logic plan validation
[ https://issues.apache.org/jira/browse/FLINK-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark You closed FLINK-6862.
---------------------------
Resolution: Duplicate
> Tumble window rowtime not resolve at logic plan validation
> ----------------------------------------------------------
>
> Key: FLINK-6862
> URL: https://issues.apache.org/jira/browse/FLINK-6862
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: Mark You
>
> Following code sample work in version 1.2.1, but failed at 1.3.0
> {code:title= TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List<Content> data = new ArrayList<Content>();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> DataStream<Content> stream = env.fromCollection(data);
> DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
> /**
> *
> */
> private static final long serialVersionUID = 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> Table table = tableEnv.fromDataStream(stream2);
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count");
> env.execute();
> }
> public static class Content implements Serializable {
> private long recordTime;
> private String content;
> public Content() {
> super();
> }
> public Content(long recordTime, String content) {
> super();
> this.recordTime = recordTime;
> this.content = content;
> }
> public long getRecordTime() {
> return recordTime;
> }
> public void setRecordTime(long recordTime) {
> this.recordTime = recordTime;
> }
> public String getContent() {
> return content;
> }
> public void setContent(String content) {
> this.content = content;
> }
> }
> private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> @Override
> public long extractTimestamp(Object[] element, long previousElementTimestamp) {
> // TODO Auto-generated method stub
> return (long) element[0];
> }
> @Override
> public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
> return new Watermark(extractedTimestamp);
> }
> }
> }
> {code}
> Exception trace:
> {noformat}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime].
> at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
> at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
> at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
> at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
> at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
> at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)