You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/06/07 12:15:18 UTC

[jira] [Commented] (FLINK-6862) Tumble window rowtime not resolve at logic plan validation

    [ https://issues.apache.org/jira/browse/FLINK-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040775#comment-16040775 ] 

Fabian Hueske commented on FLINK-6862:
--------------------------------------

The way how the Table API and SQL handle time changed a bit between 1.2 and 1.3.
Unfortunately, the documentation has not been updated yet (we are in the process of restructuring and rewriting large parts). 

The time handling has already been added as a commit: https://github.com/apache/flink/commit/cd329420eba2210a0a3b49579114d327056b2ed8

Summary: {{rowtime}} is no longer a keyword but must be added as an attribute to your table. This can be done when converting the {{DataStream}} into a {{Table}} as follows: 

{code}
Table table = tableEnv.fromDataStream(stream2, "recordTime, content, rowtime.rowtime");
{code}

This will add new timestamp attribute called {{rowtime}} (named like the first part of {{rowtime.rowtime}}) to the schema which will have the value of the extracted timestamp (denoted by the second {{rowtime}} of {{rowtime}}). 


> Tumble window rowtime not resolve at logic plan validation
> ----------------------------------------------------------
>
>                 Key: FLINK-6862
>                 URL: https://issues.apache.org/jira/browse/FLINK-6862
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Mark You
>
> Following code sample work in version 1.2.1, but failed at 1.3.0
> {code:title= TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
>     public static void main(String[] args) throws Exception {
>         List<Content> data = new ArrayList<Content>();
>         data.add(new Content(1L, "Hi"));
>         data.add(new Content(2L, "Hallo"));
>         data.add(new Content(3L, "Hello"));
>         data.add(new Content(4L, "Hello"));
>         data.add(new Content(7L, "Hello"));
>         data.add(new Content(8L, "Hello world"));
>         data.add(new Content(16L, "Hello world"));
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
>         DataStream<Content> stream = env.fromCollection(data);
>         DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
>                 new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
>                     /**
>                      * 
>                      */
>                     private static final long serialVersionUID = 410512296011057717L;
>                     @Override
>                     public long extractTimestamp(Content element) {
>                         return element.getRecordTime();
>                     }
>                 });
>         Table table = tableEnv.fromDataStream(stream2);
>         table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count");
>         env.execute();
>     }
>     public static class Content implements Serializable {
>         private long recordTime;
>         private String content;
>         public Content() {
>             super();
>         }
>         public Content(long recordTime, String content) {
>             super();
>             this.recordTime = recordTime;
>             this.content = content;
>         }
>         public long getRecordTime() {
>             return recordTime;
>         }
>         public void setRecordTime(long recordTime) {
>             this.recordTime = recordTime;
>         }
>         public String getContent() {
>             return content;
>         }
>         public void setContent(String content) {
>             this.content = content;
>         }
>     }
>     private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         @Override
>         public long extractTimestamp(Object[] element, long previousElementTimestamp) {
>             // TODO Auto-generated method stub
>             return (long) element[0];
>         }
>         @Override
>         public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
>             return new Watermark(extractedTimestamp);
>         }
>     }
> }
> {code}
> Exception trace:
> {noformat}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime].
> 	at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> 	at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> 	at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
> 	at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
> 	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)