You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Plamen Paskov <pl...@next-stream.com> on 2017/12/14 15:30:06 UTC

streamin Table API - strange behavior

Hi,

I'm trying to run the following streaming program in my local flink 
1.3.2 environment. The program compile and run without any errors but 
the print() call doesn't display anything. Once i stop the program i 
receive all aggregated data. Any ideas how to make it output regularly 
or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
     public static void main(String[] args)throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
         StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


         SingleOutputStreamOperator<WC> input = env
                 .socketTextStream("localhost",9000,"\n")
                 .map(new MapFunction<String, WC>() {
                     @Override public WC map(String value)throws Exception {
                         String[] row = value.split(",");
                         Timestamp timestamp = Timestamp.valueOf(row[2]);
                         return new WC(row[0], Long.valueOf(row[1]), timestamp);
                     }
                 })
                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                     @Override public long extractTimestamp(WC element) {
                         return element.dt.getTime();
                     }
                 });


         tEnv.registerDataStream("WordCount", input,"word, frequency, dt.rowtime");

         Table table = tEnv.scan("WordCount")
                 .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                 .groupBy("w, word")
                 .select("word, frequency.sum as frequency, w.start as dt");DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
         result.print();

         env.execute();
     }

     public static class WC {
         public Stringword;
         public long frequency;
         public Timestampdt;

         public WC() {
         }

         public WC(String word,long frequency, Timestamp dt) {
             this.word = word;
             this.frequency = frequency;
             this.dt = dt;
         }

         @Override public String toString() {
             return "WC " +word +" " +frequency +" " +dt.getTime();
         }
     }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks


Re: streamin Table API - strange behavior

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

yes you are right. I forgot that the interval is set by default when
enabling event time.

Also your comment about triggering the window is correct. Technically, you
don't need a record that falls into the next window, but just a watermark
that is past the window boundary.
In your case, watermarks only advance if the assigner sees more records and
you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or
16), because the watermark assigner subtracts 10 seconds.
Given the current watermark assigner, there is no other way than sending
more records to trigger a window computation. You can implement a custom
assigner to also emit watermarks without data, but that would somewhat bind
the event-time watermarks to the clock of the generating machine such that
watermarks wouldn't be only data-driven.

Best, Fabian

2017-12-14 17:25 GMT+01:00 Plamen Paskov <pl...@next-stream.com>:

> Hi Fabian,
>
> Thank you for your response! I think it's not necessary to do that because
> i have a call to anyway:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> which do exactly what you say. It set the watermark interval to 200ms .
> I think i found the problem and it is the default event-time trigger attached to the assigner?.
> According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default trigger.
> This trigger simply fires once the watermark passes the end of a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
> So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?
>
>
> On 14.12.2017 17:57, Fabian Hueske wrote:
>
> Hi,
>
> you are using a BoundedOutOfOrdernessTimestampExtractor to generate
> watermarks.
> The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
> assigner and only generates watermarks if a watermark interval is
> configured.
> Without watermarks, the query cannot "make progress" and only computes its
> result when the program is closed (sources emit a MAX_LONG watermark when
> being canceled).
>
> Long story short: you need to configure the watermark interval:
> env.getConfig.setAutoWatermarkInterval(100L);
>
> Best, Fabian
>
> 2017-12-14 16:30 GMT+01:00 Plamen Paskov <pl...@next-stream.com>:
>
>> Hi,
>>
>> I'm trying to run the following streaming program in my local flink 1.3.2
>> environment. The program compile and run without any errors but the print()
>> call doesn't display anything. Once i stop the program i receive all
>> aggregated data. Any ideas how to make it output regularly or when new data
>> come/old data updated?
>>
>> package flink;
>> import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.Slide;import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import java.sql.Timestamp;
>>
>> public class StreamingJob {
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>>
>>
>>         SingleOutputStreamOperator<WC> input = env
>>                 .socketTextStream("localhost", 9000, "\n")
>>                 .map(new MapFunction<String, WC>() {
>>                     @Override                    public WC map(String value) throws Exception {
>>                         String[] row = value.split(",");
>>                         Timestamp timestamp = Timestamp.valueOf(row[2]);
>>                         return new WC(row[0], Long.valueOf(row[1]), timestamp);
>>                     }
>>                 })
>>                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
>>                     @Override                    public long extractTimestamp(WC element) {
>>                         return element.dt.getTime();
>>                     }
>>                 });
>>
>>
>>         tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");
>>
>>         Table table = tEnv.scan("WordCount")
>>                 .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>>                 .groupBy("w, word")
>>                 .select("word, frequency.sum as frequency, w.start as dt");        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
>>         result.print();
>>
>>         env.execute();
>>     }
>>
>>     public static class WC {
>>         public String word;
>>         public long frequency;
>>         public Timestamp dt;
>>
>>         public WC() {
>>         }
>>
>>         public WC(String word, long frequency, Timestamp dt) {
>>             this.word = word;
>>             this.frequency = frequency;
>>             this.dt = dt;
>>         }
>>
>>         @Override        public String toString() {
>>             return "WC " + word + " " + frequency + " " + dt.getTime();
>>         }
>>     }
>> }
>>
>>
>> Sample input:
>>
>> hello,1,2017-12-14 13:10:01
>> ciao,1,2017-12-14 13:10:02
>> hello,1,2017-12-14 13:10:03
>> hello,1,2017-12-14 13:10:04
>>
>>
>> Thanks
>>
>
>
>

Re: streamin Table API - strange behavior

Posted by Plamen Paskov <pl...@next-stream.com>.
Hi Fabian,

Thank you for your response! I think it's not necessary to do that 
because i have a call to anyway:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default 
trigger. This trigger simply fires once the watermark passes the end of 
a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?


On 14.12.2017 17:57, Fabian Hueske wrote:
> Hi,
>
> you are using a BoundedOutOfOrdernessTimestampExtractor to generate 
> watermarks.
> The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark 
> assigner and only generates watermarks if a watermark interval is 
> configured.
> Without watermarks, the query cannot "make progress" and only computes 
> its result when the program is closed (sources emit a MAX_LONG 
> watermark when being canceled).
>
> Long story short: you need to configure the watermark interval: 
> env.getConfig.setAutoWatermarkInterval(100L);
>
> Best, Fabian
>
> 2017-12-14 16:30 GMT+01:00 Plamen Paskov 
> <plamen.paskov@next-stream.com <ma...@next-stream.com>>:
>
>     Hi,
>
>     I'm trying to run the following streaming program in my local
>     flink 1.3.2 environment. The program compile and run without any
>     errors but the print() call doesn't display anything. Once i stop
>     the program i receive all aggregated data. Any ideas how to make
>     it output regularly or when new data come/old data updated?
>
>     package flink;
>
>     import org.apache.flink.api.common.functions.MapFunction;
>     import org.apache.flink.api.java.tuple.Tuple2;
>     import org.apache.flink.streaming.api.TimeCharacteristic;
>     import org.apache.flink.streaming.api.datastream.DataStream;
>     import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>     import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>     import org.apache.flink.streaming.api.windowing.time.Time;
>     import org.apache.flink.table.api.Table;
>     import org.apache.flink.table.api.java.Slide;
>     import org.apache.flink.table.api.java.StreamTableEnvironment;
>
>     import java.sql.Timestamp;
>
>
>     public class StreamingJob {
>          public static void main(String[] args)throws Exception {
>              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>              StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>
>              SingleOutputStreamOperator<WC> input = env
>                      .socketTextStream("localhost",9000,"\n")
>                      .map(new MapFunction<String, WC>() {
>                          @Override public WC map(String value)throws Exception {
>                              String[] row = value.split(",");
>                              Timestamp timestamp = Timestamp.valueOf(row[2]);
>                              return new WC(row[0], Long.valueOf(row[1]), timestamp);
>                          }
>                      })
>                      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
>                          @Override public long extractTimestamp(WC element) {
>                              return element.dt.getTime();
>                          }
>                      });
>
>
>              tEnv.registerDataStream("WordCount", input,"word, frequency, dt.rowtime");
>
>              Table table = tEnv.scan("WordCount")
>                      .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>                      .groupBy("w, word")
>                      .select("word, frequency.sum as frequency, w.start as dt");DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
>              result.print();
>
>              env.execute();
>          }
>
>          public static class WC {
>              public Stringword;
>              public long frequency;
>              public Timestampdt;
>
>              public WC() {
>              }
>
>              public WC(String word,long frequency, Timestamp dt) {
>                  this.word = word;
>                  this.frequency = frequency;
>                  this.dt = dt;
>              }
>
>              @Override public String toString() {
>                  return "WC " +word +" " +frequency +" " +dt.getTime();
>              }
>          }
>     }
>
>
>     Sample input:
>
>     hello,1,2017-12-14 13:10:01
>     ciao,1,2017-12-14 13:10:02
>     hello,1,2017-12-14 13:10:03
>     hello,1,2017-12-14 13:10:04
>
>
>     Thanks
>
>


Re: streamin Table API - strange behavior

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate
watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
assigner and only generates watermarks if a watermark interval is
configured.
Without watermarks, the query cannot "make progress" and only computes its
result when the program is closed (sources emit a MAX_LONG watermark when
being canceled).

Long story short: you need to configure the watermark interval:
env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <pl...@next-stream.com>:

> Hi,
>
> I'm trying to run the following streaming program in my local flink 1.3.2
> environment. The program compile and run without any errors but the print()
> call doesn't display anything. Once i stop the program i receive all
> aggregated data. Any ideas how to make it output regularly or when new data
> come/old data updated?
>
> package flink;
> import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.Slide;import org.apache.flink.table.api.java.StreamTableEnvironment;
> import java.sql.Timestamp;
>
> public class StreamingJob {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>
>         SingleOutputStreamOperator<WC> input = env
>                 .socketTextStream("localhost", 9000, "\n")
>                 .map(new MapFunction<String, WC>() {
>                     @Override                    public WC map(String value) throws Exception {
>                         String[] row = value.split(",");
>                         Timestamp timestamp = Timestamp.valueOf(row[2]);
>                         return new WC(row[0], Long.valueOf(row[1]), timestamp);
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
>                     @Override                    public long extractTimestamp(WC element) {
>                         return element.dt.getTime();
>                     }
>                 });
>
>
>         tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");
>
>         Table table = tEnv.scan("WordCount")
>                 .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>                 .groupBy("w, word")
>                 .select("word, frequency.sum as frequency, w.start as dt");        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
>         result.print();
>
>         env.execute();
>     }
>
>     public static class WC {
>         public String word;
>         public long frequency;
>         public Timestamp dt;
>
>         public WC() {
>         }
>
>         public WC(String word, long frequency, Timestamp dt) {
>             this.word = word;
>             this.frequency = frequency;
>             this.dt = dt;
>         }
>
>         @Override        public String toString() {
>             return "WC " + word + " " + frequency + " " + dt.getTime();
>         }
>     }
> }
>
>
> Sample input:
>
> hello,1,2017-12-14 13:10:01
> ciao,1,2017-12-14 13:10:02
> hello,1,2017-12-14 13:10:03
> hello,1,2017-12-14 13:10:04
>
>
> Thanks
>