You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tarandeep Singh <ta...@gmail.com> on 2017/03/16 06:30:53 UTC

Data+control stream from kafka + window function - not working

Hi,

I am using flink-1.2 and reading data stream from Kafka (using
FlinkKafkaConsumer08). I want to connect this data stream with another
stream (read control stream) so as to do some filtering on the fly. After
filtering, I am applying window function (tumbling/sliding event window)
along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on
flink cluster. You will need to have a running Kafka cluster (local or
otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
--topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue
can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from
elements (i.e. use getInput function instead of getKafkaInput function),
the code works and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .connect(control.keyBy(0))
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream =
filteredStream.assignTimestampsAndWatermarks(
                getTimestampAssigner(Time.seconds(1))).setParallelism(3);

        watermarkedStream.transform("WatermarkDebugger",
watermarkedStream.getType(), new WatermarkDebugger<Product>());

        watermarkedStream
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(new NameCount("", 0), new FoldFunImpl(), new
WindowFunImpl())
                .print();

        env.execute();
    }

    /**
     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
     */
    private static DataStream<Product>
getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")
        ));
    }

    private static DataStream<Product>
getKafkaInput(StreamExecutionEnvironment env) throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0],
Float.parseFloat(fields[1]), fields[2]);
            }
        });
    }

    private static DataStream<Tuple1<String>>
getControl(StreamExecutionEnvironment env) {
        return env.fromElements(new Tuple1<>("p1"));
    }

    private static class CoFlatMapFunImpl extends
RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        @Override
        public void flatMap1(Product product, Collector<Product>
collector) throws Exception {
            if (productNames.contains(product.f0)) {
                collector.collect(product);
                System.out.println("Retaining product " + product + "
in data stream");
            }
        }

        @Override
        public void flatMap2(Tuple1<String> t, Collector<Product>
collector) throws Exception {
            productNames.add(t.f0);
            System.out.println("Adding product to set:" + t.f0);
        }
    }

    private static class FoldFunImpl implements
FoldFunction<Product,NameCount> {
        @Override
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;
        }
    }

    /**
     * WINDOW FUNCTION NEVER GETS CALLED.
     */
    private static class WindowFunImpl extends
RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow,
Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            collector.collect(nc);
            System.out.println("WINDOW: start time: " + new
Date(timeWindow.getStart()) + " " + nc);
        }
    }

    private static BoundedOutOfOrdernessTimestampExtractor<Product>
getTimestampAssigner(final Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");

        return new
BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
            @Override
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;
            }
        };
    }

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);
        }
    }

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);
        }
    }

    private static DataStream<String> readKafkaStream(String topic,
StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new
SimpleStringSchema(), properties));
    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }
}

Re: Data+control stream from kafka + window function - not working

Posted by Aljoscha Krettek <al...@apache.org>.
What do you get form the sys out printing in CoFlatMapFunImpl? Could it be that all the elements are being processed before the control input element arrives and that they are therefore dropped?

> On 17 Mar 2017, at 09:14, Tarandeep Singh <ta...@gmail.com> wrote:
> 
> Hi Gordon,
> 
> When I use getInput (input created via collection), then watermarks are always Long.MAX_VALUE: 
> WM: Watermark @ 9223372036854775807
> 
> This is understandable as input source has finished so a watermark of value Long.MAX_VALUE is emitted.
> 
> When I use getKafkaInput, I get this watermark:
> WM: Watermark @ 1489532509000
> 
> This corresponds to Tue Mar 14 2017 16:01:49, which seems right (last record's timestamp: 2017-03-14 16:01:50 minus 1 sec due to maxOutOfOrder value).
> 
> If I *don't* use control stream, I also get correct watermark and this time window function is called and correct aggregated values are generated.
> 
> Thanks,
> Tarandeep
> 
> 
> 
> On Thu, Mar 16, 2017 at 10:25 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>> wrote:
> Hi Tarandeep,
> 
> Thanks for clarifying.
> 
> For the next step, I would recommend taking a look at https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html> and try to find out what exactly is wrong with the watermark progression. Flink 1.2 exposes watermarks as a metric, and that should help in figuring out why the windows aren’t firing.
> 
> Also, I see you have added a “WatermarkDebugger” in your job. Have you checked whether or not the watermarks printed there are identical (using getInput v.s. getKafkaInput)?
> 
> Cheers,
> Gordon
> 
> 
> On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarandeep@gmail.com <ma...@gmail.com>) wrote:
> 
>> Anyone?
>> Any suggestions what could be going wrong or what I am doing wrong?
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <tarandeep@gmail.com <ma...@gmail.com>> wrote:
>> Data is read from Kafka and yes I use different group id every time I run the code. I have put break points and print statements to verify that.
>> 
>> Also, if I don't connect with control stream the window function works. 
>> 
>> - Tarandeep
>> 
>> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>> wrote:
>> 
>>> Hi Tarandeep,
>>> 
>>> I haven’t looked at the rest of the code yet, but my first guess is that you might not be reading any data from Kafka at all:
>>> 
>>>> private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>>>> 
>>>>         Properties properties = new Properties();
>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>         properties.setProperty("group.id <http://group.id/>", "group-0009");
>>>>         properties.setProperty("auto.offset.reset", "smallest");
>>>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>>>     }
>>> 
>>> 
>>> Have you tried using a different “group.id <http://group.id/>” everytime you’re re-running the job?
>>> Note that the “auto.offset.reset” value is only respected when there aren’t any offsets for the group committed in Kafka.
>>> So you might not actually be reading the complete “small_input.cv <http://small_input.cv/>” dataset, unless you use a different group.id <http://group.id/> overtime.
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com <ma...@gmail.com>) wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.
>>>> 
>>>> Any help to debug/fix this is greatly appreciated!
>>>> 
>>>> Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise).
>>>> Create a topic and add test data points-
>>>> 
>>>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1
>>>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv
>>>> 
>>>> where small_input.csv contains the following lines-
>>>> 
>>>> p1,10.0f,2017-03-14 16:01:01
>>>> p1,10.0f,2017-03-14 16:01:02
>>>> p1,10.0f,2017-03-14 16:01:03
>>>> p1,10.0f,2017-03-14 16:01:04
>>>> p1,10.0f,2017-03-14 16:01:05
>>>> p1,10.0f,2017-03-14 16:01:10
>>>> p1,10.0f,2017-03-14 16:01:11
>>>> p1,10.0f,2017-03-14 16:01:12
>>>> p1,10.0f,2017-03-14 16:01:40
>>>> p1,10.0f,2017-03-14 16:01:50
>>>> 
>>>> Now you can run the code given below. Note:
>>>> 
>>>> 1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well)
>>>> 2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput function), the code works and window function is fired.
>>>> 
>>>> Thanks,
>>>> Tarandeep
>>>> 
>>>> 
>>>> 
>>>> import org.apache.flink.api.common.functions.FoldFunction;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.api.java.tuple.Tuple;
>>>> import org.apache.flink.api.java.tuple.Tuple1;
>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>> import org.apache.flink.api.java.tuple.Tuple3;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.api.functions.co <http://api.functions.co/>.RichCoFlatMapFunction;
>>>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>>>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>>>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>>> import org.apache.flink.util.Collector;
>>>> 
>>>> import java.io.IOException;
>>>> import java.text.DateFormat;
>>>> import java.text.SimpleDateFormat;
>>>> import java.util.*;
>>>> 
>>>> public class Test3 {
>>>> 
>>>>     public static void main(String[] args) throws Exception {
>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> 
>>>>         //DataStream<Product> product = getInput(env);
>>>>         DataStream<Product> product = getKafkaInput(env);
>>>>         DataStream<Tuple1<String>> control= getControl(env);
>>>> 
>>>>         DataStream<Product> filteredStream = product.keyBy(0)
>>>>                 .connect(control.keyBy(0))
>>>>                 .flatMap(new CoFlatMapFunImpl());
>>>> 
>>>>         DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
>>>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>>>> 
>>>>         watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());
>>>> 
>>>>         watermarkedStream
>>>>                 .keyBy(0)
>>>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
>>>>                 .print();
>>>> 
>>>>         env.execute();
>>>>     }
>>>> 
>>>>     /**
>>>>      * If instead of reading from Kafka, create stream from elements, the
>>>>      * code works and window function is fired!
>>>>      */
>>>>     private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
>>>>         return env.fromCollection(Arrays.asList(
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>>>         ));
>>>>     }
>>>> 
>>>>     private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>>>>         DataStream<String> s = readKafkaStream("test", env);
>>>> 
>>>>         return s.map(new MapFunction<String, Product>() {
>>>>             @Override
>>>>             public Product map(String s) throws Exception {
>>>>                 String[] fields = s.split(",");
>>>>                 return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
>>>>             }
>>>>         });
>>>>     }
>>>> 
>>>>     private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
>>>>         return env.fromElements(new Tuple1<>("p1"));
>>>>     }
>>>> 
>>>>     private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>>>> 
>>>>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>>>> 
>>>>         @Override
>>>>         public void flatMap1(Product product, Collector<Product> collector) throws Exception {
>>>>             if (productNames.contains(product.f0)) {
>>>>                 collector.collect(product);
>>>>                 System.out.println("Retaining product " + product + " in data stream");
>>>>             }
>>>>         }
>>>> 
>>>>         @Override
>>>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
>>>>             productNames.add(t.f0);
>>>>             System.out.println("Adding product to set:" + t.f0);
>>>>         }
>>>>     }
>>>> 
>>>>     private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
>>>>         @Override
>>>>         public NameCount fold(NameCount current, Product p) throws Exception {
>>>>             current.f0 = p.f0;
>>>>             current.f1 += 1;
>>>>             return current;
>>>>         }
>>>>     }
>>>> 
>>>>     /**
>>>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>>>      */
>>>>     private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>>>>         @Override
>>>>         public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
>>>>                           Collector<NameCount> collector) throws Exception {
>>>>             NameCount nc = iterable.iterator().next();
>>>>             collector.collect(nc);
>>>>             System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
>>>>         }
>>>>     }
>>>> 
>>>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
>>>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>>>> 
>>>>         return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>>>>             @Override
>>>>             public long extractTimestamp(Product p) {
>>>>                 long ts = 0L;
>>>>                 try {
>>>>                     ts = dateFormat.parse(p.f2).getTime();
>>>>                 } catch (Exception e) {}
>>>>                 return ts;
>>>>             }
>>>>         };
>>>>     }
>>>> 
>>>>     public static class Product extends Tuple3<String,Float,String> {
>>>>         public Product() {}
>>>>         public Product(String name, Float price, String dateTime) {
>>>>             super(name, price, dateTime);
>>>>         }
>>>>     }
>>>> 
>>>>     public static class NameCount extends Tuple2<String,Integer> {
>>>>         public NameCount() {}
>>>>         public NameCount(String name, Integer count) {
>>>>             super(name, count);
>>>>         }
>>>>     }
>>>> 
>>>>     private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>>>> 
>>>>         Properties properties = new Properties();
>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>         properties.setProperty("group.id <http://group.id/>", "group-0009");
>>>>         properties.setProperty("auto.offset.reset", "smallest");
>>>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>>>     }
>>>> 
>>>>     public static class WatermarkDebugger<T>
>>>>             extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
>>>>         private static final long serialVersionUID = 1L;
>>>> 
>>>>         @Override
>>>>         public void processElement(StreamRecord<T> element) throws Exception {
>>>>             System.out.println("ELEMENT: " + element);
>>>>             output.collect(element);
>>>>         }
>>>> 
>>>>         @Override
>>>>         public void processWatermark(Watermark mark) throws Exception {
>>>>             super.processWatermark(mark);
>>>>             System.out.println("WM: " + mark);
>>>>         }
>>>>     }
>>>> }
>>>> 
>>>> 
>> 
> 


Re: Data+control stream from kafka + window function - not working

Posted by Tarandeep Singh <ta...@gmail.com>.
Hi Gordon,

When I use getInput (input created via collection), then watermarks are
always Long.MAX_VALUE:
WM: Watermark @ 9223372036854775807

This is understandable as input source has finished so a watermark of value
Long.MAX_VALUE is emitted.

When I use getKafkaInput, I get this watermark:
WM: Watermark @ 1489532509000

This corresponds to Tue Mar 14 2017 16:01:49, which seems right (last
record's timestamp: 2017-03-14 16:01:50 minus 1 sec due to maxOutOfOrder
value).

If I *don't* use control stream, I also get correct watermark and this time
window function is called and correct aggregated values are generated.

Thanks,
Tarandeep



On Thu, Mar 16, 2017 at 10:25 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Tarandeep,
>
> Thanks for clarifying.
>
> For the next step, I would recommend taking a look at
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/
> debugging_event_time.html and try to find out what exactly is wrong with
> the watermark progression. Flink 1.2 exposes watermarks as a metric, and
> that should help in figuring out why the windows aren’t firing.
>
> Also, I see you have added a “WatermarkDebugger” in your job. Have you
> checked whether or not the watermarks printed there are identical (using
> getInput v.s. getKafkaInput)?
>
> Cheers,
> Gordon
>
>
> On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarandeep@gmail.com)
> wrote:
>
> Anyone?
> Any suggestions what could be going wrong or what I am doing wrong?
>
> Thanks,
> Tarandeep
>
>
> On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <ta...@gmail.com>
> wrote:
>
>> Data is read from Kafka and yes I use different group id every time I run
>> the code. I have put break points and print statements to verify that.
>>
>> Also, if I don't connect with control stream the window function works.
>>
>> - Tarandeep
>>
>> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>> Hi Tarandeep,
>>
>> I haven’t looked at the rest of the code yet, but my first guess is that
>> you might not be reading any data from Kafka at all:
>>
>> private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>     }
>>
>>
>> Have you tried using a different “group.id” everytime you’re re-running
>> the job?
>> Note that the “auto.offset.reset” value is only respected when there
>> aren’t any offsets for the group committed in Kafka.
>> So you might not actually be reading the complete “small_input.cv”
>> dataset, unless you use a different group.id overtime.
>>
>> Cheers,
>> Gordon
>>
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> I am using flink-1.2 and reading data stream from Kafka (using
>> FlinkKafkaConsumer08). I want to connect this data stream with another
>> stream (read control stream) so as to do some filtering on the fly. After
>> filtering, I am applying window function (tumbling/sliding event window)
>> along with fold function. However, the window function does not get called.
>>
>> Any help to debug/fix this is greatly appreciated!
>>
>> Below is a reproducible code that one can run in IDE like IntelliJ or on
>> flink cluster. You will need to have a running Kafka cluster (local or
>> otherwise).
>> Create a topic and add test data points-
>>
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
>> localhost:2181 --replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
>> --topic test < small_input.csv
>>
>> where small_input.csv contains the following lines-
>>
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>>
>> Now you can run the code given below. Note:
>>
>> 1) In this example, I am not reading control stream from Kafka (but issue
>> can be reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from
>> elements (i.e. use getInput function instead of getKafkaInput function),
>> the code works and window function is fired.
>>
>> Thanks,
>> Tarandeep
>>
>>
>>
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>>
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>>
>> public class Test3 {
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>         //DataStream<Product> product = getInput(env);
>>         DataStream<Product> product = getKafkaInput(env);
>>         DataStream<Tuple1<String>> control= getControl(env);
>>
>>         DataStream<Product> filteredStream = product.keyBy(0)
>>                 .connect(control.keyBy(0))
>>                 .flatMap(new CoFlatMapFunImpl());
>>
>>         DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>>
>>         watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());
>>
>>         watermarkedStream
>>                 .keyBy(0)
>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
>>                 .print();
>>
>>         env.execute();
>>     }
>>
>>     /**
>>      * If instead of reading from Kafka, create stream from elements, the
>>      * code works and window function is fired!
>>      */
>>     private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
>>         return env.fromCollection(Arrays.asList(
>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>         ));
>>     }
>>
>>     private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>>         DataStream<String> s = readKafkaStream("test", env);
>>
>>         return s.map(new MapFunction<String, Product>() {
>>             @Override
>>             public Product map(String s) throws Exception {
>>                 String[] fields = s.split(",");
>>                 return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
>>             }
>>         });
>>     }
>>
>>     private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
>>         return env.fromElements(new Tuple1<>("p1"));
>>     }
>>
>>     private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>>
>>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>>
>>         @Override
>>         public void flatMap1(Product product, Collector<Product> collector) throws Exception {
>>             if (productNames.contains(product.f0)) {
>>                 collector.collect(product);
>>                 System.out.println("Retaining product " + product + " in data stream");
>>             }
>>         }
>>
>>         @Override
>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
>>             productNames.add(t.f0);
>>             System.out.println("Adding product to set:" + t.f0);
>>         }
>>     }
>>
>>     private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
>>         @Override
>>         public NameCount fold(NameCount current, Product p) throws Exception {
>>             current.f0 = p.f0;
>>             current.f1 += 1;
>>             return current;
>>         }
>>     }
>>
>>     /**
>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>      */
>>     private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>>         @Override
>>         public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
>>                           Collector<NameCount> collector) throws Exception {
>>             NameCount nc = iterable.iterator().next();
>>             collector.collect(nc);
>>             System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
>>         }
>>     }
>>
>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>>
>>         return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>>             @Override
>>             public long extractTimestamp(Product p) {
>>                 long ts = 0L;
>>                 try {
>>                     ts = dateFormat.parse(p.f2).getTime();
>>                 } catch (Exception e) {}
>>                 return ts;
>>             }
>>         };
>>     }
>>
>>     public static class Product extends Tuple3<String,Float,String> {
>>         public Product() {}
>>         public Product(String name, Float price, String dateTime) {
>>             super(name, price, dateTime);
>>         }
>>     }
>>
>>     public static class NameCount extends Tuple2<String,Integer> {
>>         public NameCount() {}
>>         public NameCount(String name, Integer count) {
>>             super(name, count);
>>         }
>>     }
>>
>>     private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>     }
>>
>>     public static class WatermarkDebugger<T>
>>             extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public void processElement(StreamRecord<T> element) throws Exception {
>>             System.out.println("ELEMENT: " + element);
>>             output.collect(element);
>>         }
>>
>>         @Override
>>         public void processWatermark(Watermark mark) throws Exception {
>>             super.processWatermark(mark);
>>             System.out.println("WM: " + mark);
>>         }
>>     }
>> }
>>
>>
>>
>

Re: Data+control stream from kafka + window function - not working

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Tarandeep,

Thanks for clarifying.

For the next step, I would recommend taking a look at https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html and try to find out what exactly is wrong with the watermark progression. Flink 1.2 exposes watermarks as a metric, and that should help in figuring out why the windows aren’t firing.

Also, I see you have added a “WatermarkDebugger” in your job. Have you checked whether or not the watermarks printed there are identical (using getInput v.s. getKafkaInput)?

Cheers,
Gordon

On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:

Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <ta...@gmail.com> wrote:
Data is read from Kafka and yes I use different group id every time I run the code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:

Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you might not be reading any data from Kafka at all:

private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
    }

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput function), the code works and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .connect(control.keyBy(0))
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
                getTimestampAssigner(Time.seconds(1))).setParallelism(3);

        watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());

        watermarkedStream
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
                .print();

        env.execute();
    }

    /**
     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
     */
    private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")
        ));
    }

    private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
            }
        });
    }

    private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
        return env.fromElements(new Tuple1<>("p1"));
    }

    private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        @Override
        public void flatMap1(Product product, Collector<Product> collector) throws Exception {
            if (productNames.contains(product.f0)) {
                collector.collect(product);
                System.out.println("Retaining product " + product + " in data stream");
            }
        }

        @Override
        public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
            productNames.add(t.f0);
            System.out.println("Adding product to set:" + t.f0);
        }
    }

    private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
        @Override
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;
        }
    }

    /**
     * WINDOW FUNCTION NEVER GETS CALLED.
     */
    private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            collector.collect(nc);
            System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
        }
    }

    private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
            @Override
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;
            }
        };
    }

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);
        }
    }

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);
        }
    }

    private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }
}




Re: Data+control stream from kafka + window function - not working

Posted by Tarandeep Singh <ta...@gmail.com>.
Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <ta...@gmail.com>
wrote:

> Data is read from Kafka and yes I use different group id every time I run
> the code. I have put break points and print statements to verify that.
>
> Also, if I don't connect with control stream the window function works.
>
> - Tarandeep
>
> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
> Hi Tarandeep,
>
> I haven’t looked at the rest of the code yet, but my first guess is that
> you might not be reading any data from Kafka at all:
>
> private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id", "group-0009");
>         properties.setProperty("auto.offset.reset", "smallest");
>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>     }
>
>
> Have you tried using a different “group.id” everytime you’re re-running
> the job?
> Note that the “auto.offset.reset” value is only respected when there
> aren’t any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv”
> dataset, unless you use a different group.id overtime.
>
> Cheers,
> Gordon
>
> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com)
> wrote:
>
> Hi,
>
> I am using flink-1.2 and reading data stream from Kafka (using
> FlinkKafkaConsumer08). I want to connect this data stream with another
> stream (read control stream) so as to do some filtering on the fly. After
> filtering, I am applying window function (tumbling/sliding event window)
> along with fold function. However, the window function does not get called.
>
> Any help to debug/fix this is greatly appreciated!
>
> Below is a reproducible code that one can run in IDE like IntelliJ or on
> flink cluster. You will need to have a running Kafka cluster (local or
> otherwise).
> Create a topic and add test data points-
>
> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
> localhost:2181 --replication-factor 1 --partitions 1
> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic test < small_input.csv
>
> where small_input.csv contains the following lines-
>
> p1,10.0f,2017-03-14 16:01:01
> p1,10.0f,2017-03-14 16:01:02
> p1,10.0f,2017-03-14 16:01:03
> p1,10.0f,2017-03-14 16:01:04
> p1,10.0f,2017-03-14 16:01:05
> p1,10.0f,2017-03-14 16:01:10
> p1,10.0f,2017-03-14 16:01:11
> p1,10.0f,2017-03-14 16:01:12
> p1,10.0f,2017-03-14 16:01:40
> p1,10.0f,2017-03-14 16:01:50
>
> Now you can run the code given below. Note:
>
> 1) In this example, I am not reading control stream from Kafka (but issue
> can be reproduced with this code as well)
> 2) If instead of reading data stream from kafka, I create stream from
> elements (i.e. use getInput function instead of getKafkaInput function),
> the code works and window function is fired.
>
> Thanks,
> Tarandeep
>
>
>
> import org.apache.flink.api.common.functions.FoldFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.*;
>
> public class Test3 {
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         //DataStream<Product> product = getInput(env);
>         DataStream<Product> product = getKafkaInput(env);
>         DataStream<Tuple1<String>> control= getControl(env);
>
>         DataStream<Product> filteredStream = product.keyBy(0)
>                 .connect(control.keyBy(0))
>                 .flatMap(new CoFlatMapFunImpl());
>
>         DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>
>         watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());
>
>         watermarkedStream
>                 .keyBy(0)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                 .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
>                 .print();
>
>         env.execute();
>     }
>
>     /**
>      * If instead of reading from Kafka, create stream from elements, the
>      * code works and window function is fired!
>      */
>     private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
>         return env.fromCollection(Arrays.asList(
>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>         ));
>     }
>
>     private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>         DataStream<String> s = readKafkaStream("test", env);
>
>         return s.map(new MapFunction<String, Product>() {
>             @Override
>             public Product map(String s) throws Exception {
>                 String[] fields = s.split(",");
>                 return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
>             }
>         });
>     }
>
>     private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
>         return env.fromElements(new Tuple1<>("p1"));
>     }
>
>     private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>
>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>
>         @Override
>         public void flatMap1(Product product, Collector<Product> collector) throws Exception {
>             if (productNames.contains(product.f0)) {
>                 collector.collect(product);
>                 System.out.println("Retaining product " + product + " in data stream");
>             }
>         }
>
>         @Override
>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
>             productNames.add(t.f0);
>             System.out.println("Adding product to set:" + t.f0);
>         }
>     }
>
>     private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
>         @Override
>         public NameCount fold(NameCount current, Product p) throws Exception {
>             current.f0 = p.f0;
>             current.f1 += 1;
>             return current;
>         }
>     }
>
>     /**
>      * WINDOW FUNCTION NEVER GETS CALLED.
>      */
>     private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>         @Override
>         public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
>                           Collector<NameCount> collector) throws Exception {
>             NameCount nc = iterable.iterator().next();
>             collector.collect(nc);
>             System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
>         }
>     }
>
>     private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>
>         return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>             @Override
>             public long extractTimestamp(Product p) {
>                 long ts = 0L;
>                 try {
>                     ts = dateFormat.parse(p.f2).getTime();
>                 } catch (Exception e) {}
>                 return ts;
>             }
>         };
>     }
>
>     public static class Product extends Tuple3<String,Float,String> {
>         public Product() {}
>         public Product(String name, Float price, String dateTime) {
>             super(name, price, dateTime);
>         }
>     }
>
>     public static class NameCount extends Tuple2<String,Integer> {
>         public NameCount() {}
>         public NameCount(String name, Integer count) {
>             super(name, count);
>         }
>     }
>
>     private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id", "group-0009");
>         properties.setProperty("auto.offset.reset", "smallest");
>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>     }
>
>     public static class WatermarkDebugger<T>
>             extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void processElement(StreamRecord<T> element) throws Exception {
>             System.out.println("ELEMENT: " + element);
>             output.collect(element);
>         }
>
>         @Override
>         public void processWatermark(Watermark mark) throws Exception {
>             super.processWatermark(mark);
>             System.out.println("WM: " + mark);
>         }
>     }
> }
>
>
>

Re: Data+control stream from kafka + window function - not working

Posted by Tarandeep Singh <ta...@gmail.com>.
Data is read from Kafka and yes I use different group id every time I run the code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> Hi Tarandeep,
> 
> I haven’t looked at the rest of the code yet, but my first guess is that you might not be reading any data from Kafka at all:
> 
>> private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>     }
> 
> 
> Have you tried using a different “group.id” everytime you’re re-running the job?
> Note that the “auto.offset.reset” value is only respected when there aren’t any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv” dataset, unless you use a different group.id overtime.
> 
> Cheers,
> Gordon
> 
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:
>> 
>> Hi,
>> 
>> I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.
>> 
>> Any help to debug/fix this is greatly appreciated!
>> 
>> Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise).
>> Create a topic and add test data points-
>> 
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv
>> 
>> where small_input.csv contains the following lines-
>> 
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>> 
>> Now you can run the code given below. Note:
>> 
>> 1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput function), the code works and window function is fired.
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> 
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>> 
>> public class Test3 {
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>         //DataStream<Product> product = getInput(env);
>>         DataStream<Product> product = getKafkaInput(env);
>>         DataStream<Tuple1<String>> control= getControl(env);
>> 
>>         DataStream<Product> filteredStream = product.keyBy(0)
>>                 .connect(control.keyBy(0))
>>                 .flatMap(new CoFlatMapFunImpl());
>> 
>>         DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>> 
>>         watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());
>> 
>>         watermarkedStream
>>                 .keyBy(0)
>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
>>                 .print();
>> 
>>         env.execute();
>>     }
>> 
>>     /**
>>      * If instead of reading from Kafka, create stream from elements, the
>>      * code works and window function is fired!
>>      */
>>     private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
>>         return env.fromCollection(Arrays.asList(
>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>         ));
>>     }
>> 
>>     private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>>         DataStream<String> s = readKafkaStream("test", env);
>> 
>>         return s.map(new MapFunction<String, Product>() {
>>             @Override
>>             public Product map(String s) throws Exception {
>>                 String[] fields = s.split(",");
>>                 return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
>>             }
>>         });
>>     }
>> 
>>     private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
>>         return env.fromElements(new Tuple1<>("p1"));
>>     }
>> 
>>     private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>> 
>>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>> 
>>         @Override
>>         public void flatMap1(Product product, Collector<Product> collector) throws Exception {
>>             if (productNames.contains(product.f0)) {
>>                 collector.collect(product);
>>                 System.out.println("Retaining product " + product + " in data stream");
>>             }
>>         }
>> 
>>         @Override
>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
>>             productNames.add(t.f0);
>>             System.out.println("Adding product to set:" + t.f0);
>>         }
>>     }
>> 
>>     private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
>>         @Override
>>         public NameCount fold(NameCount current, Product p) throws Exception {
>>             current.f0 = p.f0;
>>             current.f1 += 1;
>>             return current;
>>         }
>>     }
>> 
>>     /**
>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>      */
>>     private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>>         @Override
>>         public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
>>                           Collector<NameCount> collector) throws Exception {
>>             NameCount nc = iterable.iterator().next();
>>             collector.collect(nc);
>>             System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
>>         }
>>     }
>> 
>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>> 
>>         return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>>             @Override
>>             public long extractTimestamp(Product p) {
>>                 long ts = 0L;
>>                 try {
>>                     ts = dateFormat.parse(p.f2).getTime();
>>                 } catch (Exception e) {}
>>                 return ts;
>>             }
>>         };
>>     }
>> 
>>     public static class Product extends Tuple3<String,Float,String> {
>>         public Product() {}
>>         public Product(String name, Float price, String dateTime) {
>>             super(name, price, dateTime);
>>         }
>>     }
>> 
>>     public static class NameCount extends Tuple2<String,Integer> {
>>         public NameCount() {}
>>         public NameCount(String name, Integer count) {
>>             super(name, count);
>>         }
>>     }
>> 
>>     private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
>>     }
>> 
>>     public static class WatermarkDebugger<T>
>>             extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
>>         private static final long serialVersionUID = 1L;
>> 
>>         @Override
>>         public void processElement(StreamRecord<T> element) throws Exception {
>>             System.out.println("ELEMENT: " + element);
>>             output.collect(element);
>>         }
>> 
>>         @Override
>>         public void processWatermark(Watermark mark) throws Exception {
>>             super.processWatermark(mark);
>>             System.out.println("WM: " + mark);
>>         }
>>     }
>> }
>> 
>> 

Re: Data+control stream from kafka + window function - not working

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you might not be reading any data from Kafka at all:

private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
    }

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput function), the code works and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .connect(control.keyBy(0))
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
                getTimestampAssigner(Time.seconds(1))).setParallelism(3);

        watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>());

        watermarkedStream
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
                .print();

        env.execute();
    }

    /**
     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
     */
    private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")
        ));
    }

    private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
            }
        });
    }

    private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
        return env.fromElements(new Tuple1<>("p1"));
    }

    private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        @Override
        public void flatMap1(Product product, Collector<Product> collector) throws Exception {
            if (productNames.contains(product.f0)) {
                collector.collect(product);
                System.out.println("Retaining product " + product + " in data stream");
            }
        }

        @Override
        public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
            productNames.add(t.f0);
            System.out.println("Adding product to set:" + t.f0);
        }
    }

    private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
        @Override
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;
        }
    }

    /**
     * WINDOW FUNCTION NEVER GETS CALLED.
     */
    private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            collector.collect(nc);
            System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);
        }
    }

    private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
            @Override
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;
            }
        };
    }

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);
        }
    }

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);
        }
    }

    private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }
}