You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mark You (JIRA)" <ji...@apache.org> on 2017/06/07 09:57:18 UTC

[jira] [Updated] (FLINK-6862) Tumble window rowtime not resolve at logic plan validation

     [ https://issues.apache.org/jira/browse/FLINK-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mark You updated FLINK-6862:
----------------------------
    Description: 
Following code sample work in version 1.2.1, but failed at 1.3.0
{code:title= TumblingWindow.java|borderStyle=solid}
public class TumblingWindow {

    public static void main(String[] args) throws Exception {
        List<Content> data = new ArrayList<Content>();
        data.add(new Content(1L, "Hi"));
        data.add(new Content(2L, "Hallo"));
        data.add(new Content(3L, "Hello"));
        data.add(new Content(4L, "Hello"));
        data.add(new Content(7L, "Hello"));
        data.add(new Content(8L, "Hello world"));
        data.add(new Content(16L, "Hello world"));

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        DataStream<Content> stream = env.fromCollection(data);

        DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 410512296011057717L;

                    @Override
                    public long extractTimestamp(Content element) {
                        return element.getRecordTime();
                    }

                });

        Table table = tableEnv.fromDataStream(stream2);
        table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count");

        env.execute();
    }

    public static class Content implements Serializable {

        private long recordTime;
        private String content;

        public Content() {
            super();
        }

        public Content(long recordTime, String content) {
            super();
            this.recordTime = recordTime;
            this.content = content;
        }

        public long getRecordTime() {
            return recordTime;
        }

        public void setRecordTime(long recordTime) {
            this.recordTime = recordTime;
        }

        public String getContent() {
            return content;
        }

        public void setContent(String content) {
            this.content = content;
        }

    }

    private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(Object[] element, long previousElementTimestamp) {
            // TODO Auto-generated method stub
            return (long) element[0];
        }

        @Override
        public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }

    }
}
{code}

{noformat}
Exception trace
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime].
	at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
	at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
	at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
	at scala.collection.AbstractIterator.to(Iterator.scala:1194)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
	at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
	at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
	at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
{noformat}

  was:
Following code sample work in version 1.2.1, but failed at 1.3.0
{code:title=Bar.java|borderStyle=solid}
public class TumblingWindow {

    public static void main(String[] args) throws Exception {
        List<Content> data = new ArrayList<Content>();
        data.add(new Content(1L, "Hi"));
        data.add(new Content(2L, "Hallo"));
        data.add(new Content(3L, "Hello"));
        data.add(new Content(4L, "Hello"));
        data.add(new Content(7L, "Hello"));
        data.add(new Content(8L, "Hello world"));
        data.add(new Content(16L, "Hello world"));

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        DataStream<Content> stream = env.fromCollection(data);

        DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 410512296011057717L;

                    @Override
                    public long extractTimestamp(Content element) {
                        return element.getRecordTime();
                    }

                });

        Table table = tableEnv.fromDataStream(stream2);
        table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count");

        env.execute();
    }

    public static class Content implements Serializable {

        private long recordTime;
        private String content;

        public Content() {
            super();
        }

        public Content(long recordTime, String content) {
            super();
            this.recordTime = recordTime;
            this.content = content;
        }

        public long getRecordTime() {
            return recordTime;
        }

        public void setRecordTime(long recordTime) {
            this.recordTime = recordTime;
        }

        public String getContent() {
            return content;
        }

        public void setContent(String content) {
            this.content = content;
        }

    }

    private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(Object[] element, long previousElementTimestamp) {
            // TODO Auto-generated method stub
            return (long) element[0];
        }

        @Override
        public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }

    }
}
{code}

{noformat}
Exception trace
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime].
	at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
	at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
	at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
	at scala.collection.AbstractIterator.to(Iterator.scala:1194)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
	at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
	at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
	at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
{noformat}


> Tumble window rowtime not resolve at logic plan validation
> ----------------------------------------------------------
>
>                 Key: FLINK-6862
>                 URL: https://issues.apache.org/jira/browse/FLINK-6862
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Mark You
>
> Following code sample work in version 1.2.1, but failed at 1.3.0
> {code:title= TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
>     public static void main(String[] args) throws Exception {
>         List<Content> data = new ArrayList<Content>();
>         data.add(new Content(1L, "Hi"));
>         data.add(new Content(2L, "Hallo"));
>         data.add(new Content(3L, "Hello"));
>         data.add(new Content(4L, "Hello"));
>         data.add(new Content(7L, "Hello"));
>         data.add(new Content(8L, "Hello world"));
>         data.add(new Content(16L, "Hello world"));
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
>         DataStream<Content> stream = env.fromCollection(data);
>         DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
>                 new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
>                     /**
>                      * 
>                      */
>                     private static final long serialVersionUID = 410512296011057717L;
>                     @Override
>                     public long extractTimestamp(Content element) {
>                         return element.getRecordTime();
>                     }
>                 });
>         Table table = tableEnv.fromDataStream(stream2);
>         table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, content.count");
>         env.execute();
>     }
>     public static class Content implements Serializable {
>         private long recordTime;
>         private String content;
>         public Content() {
>             super();
>         }
>         public Content(long recordTime, String content) {
>             super();
>             this.recordTime = recordTime;
>             this.content = content;
>         }
>         public long getRecordTime() {
>             return recordTime;
>         }
>         public void setRecordTime(long recordTime) {
>             this.recordTime = recordTime;
>         }
>         public String getContent() {
>             return content;
>         }
>         public void setContent(String content) {
>             this.content = content;
>         }
>     }
>     private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         @Override
>         public long extractTimestamp(Object[] element, long previousElementTimestamp) {
>             // TODO Auto-generated method stub
>             return (long) element[0];
>         }
>         @Override
>         public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
>             return new Watermark(extractedTimestamp);
>         }
>     }
> }
> {code}
> {noformat}
> Exception trace
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve [rowtime] given input [content, recordTime].
> 	at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> 	at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> 	at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
> 	at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
> 	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)