You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sam Huang <sa...@reflektion.com> on 2017/03/07 23:17:09 UTC

window function not working when control stream broadcast

Hi all,

I connected my data stream with my control stream and create event time
tumbling window, everything works fine. But when I add .broadcast()
function to the control stream, the window function doesn't work anymore.

I'm running that on my local, the code is here:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);
    DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
    DataStream<String> controlStream =
JsonEventStreamReader.readControlStream(env);//.broadcast();   With
broadcast, window function doesn't work
    jsonEventStream
        .flatMap(new StrToTuplesFlatMapFunImpl())
        .connect(controlStream)
        .flatMap(new DataFilterFunImpl())
        .assignTimestampsAndWatermarks(getTimestampsWatermarksAssigner())
        .keyBy(0, 1, 2, 3)
        .timeWindow(Time.seconds(WINDOW_LENGTH))
        .allowedLateness(Time.seconds(WINDOW_LATENESS))
        .reduce(new ReduceFunImpl(), new WindowFunImpl())
        .addSink(new InfluxDBSink(INFLUXDB_DB));

    env.execute();
}

Re: window function not working when control stream broadcast

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Sam,

could you please also send the code of the timestamp/watermark assigner?
This could also affect things.


Best,

Aljoscha





On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote:

> Hi Aljoscha,

> 

> Here's the code:

> private static class DataFilterFunImpl extends
> RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
>
>         private JSONParser parser; private Map<String, Map<String,
>         ControlJsonConfig>> whiteListMap = new HashMap<>();
>
>         @Override // tuple5(domain, device_type, type, key,
>         count_or_sum) public void flatMap1(KVTuple6 dataTuple,
>         Collector<KVTuple6> collector) throws Exception {
>
>             String type = dataTuple.f2; String[] keyValue = dataTuple-
>             .f3.split(RawEventExtractor.Constants.*DEFAULT_VALUE_SP*);
>             String key = keyValue[]; switch (type) {
>
>                 case RawEventExtractor.Constants.*VALUE_COUNT*: {
> 

>                     if (whiteListMap.containsKey(key)) {
> 

>                         ControlJsonConfig ruleConfig = whiteListMap.g-
>                         et(key).get(RawEventExtractor.Constants.*VALU-
>                         E_COUNT*); if (ruleConfig != null) {
>
>                             String value = keyValue.length > 1 ?
>                             keyValue[1] : ""; String bucket =
>                             ruleConfig.getBucketName(value); if
>                             (bucket != null) {
>
>                                 dataTuple.setField(String.*join*(RawE-
>                                 ventExtractor.Constants.*DEFAULT_VALU-
>                                 E_SP*, key, bucket), 3);
>                                 collector.collect(dataTuple); }
>
>                         } else {
> 

>                             collector.collect(dataTuple); }
>
>                     }
> 

>                     break;
>                 }
> 

>                 case RawEventExtractor.Constants.*VALUE_SUM*: {
> 

>                     if (whiteListMap.containsKey(key) && whiteListMap-
>                     .get(key).containsKey(RawEventExtractor.Constants-
>                     .*VALUE_SUM*)) {
>
>                         collector.collect(dataTuple); }
>
>                     break;
>                 }
> 

>                 default: collector.collect(dataTuple); }
>
>         }
> 

> 
> 

> 
> 

>         @Override public void flatMap2(String jsonStr,
>         Collector<KVTuple6> collector) throws Exception {
>
> //            Map<String, Map<String, ControlJsonConfig>> whiteListMap
> = whiteListMapState.value();            try {
>
>                 if (parser == null) {
> 

>                      parser = new JSONParser(); }
>
>                 JSONObject jsonConfig = (JSONObject)
>                 parser.parse(jsonStr); Tuple2<String, Map<String,
>                 ControlJsonConfig>> config =
>                 RawEventExtractor.*getKeyConfig*(jsonConfig); if
>                 (config.f1 == null) {
>
>                     whiteListMap.remove(config.f0); } else {
>
>                     whiteListMap.put(config.f0, config.f1); }
>
>             } catch (Exception e) {}
> 

>         }
> 

>     }
> 

> 

> FYI, if I setParallelism of both the control stream and data stream,
> the window function works. Is it necessary to do so for broadcast()
> function?
> 

> 

> On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek
> <al...@apache.org> wrote:
>> __

>> Hi Sam,

>> could you please also send the code for the DataFilterFunImpl and
>> your timestamps/watermark assigner. That could help in figuring out
>> the problem.
>> 

>> Best,

>> Aljoscha

>> 

>> 

>> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:

>>> Hi Timo,

>>> 

>>> The window function sinks the data into InfluxDB, and it's not
>>> triggered.
>>> If I comment the ".timeWindow", and print results after the reduce
>>> function, it works
>>> Code for window function is here:

>>> 

>>> private static class WindowFunImpl implements
>>> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>>>
>>> 
>>> 
>>> 

>>>     @Override public void apply(Tuple tuple, TimeWindow window,
>>>     Iterable<KVTuple6> iterable,                  Collector<Point>
>>>     collector) throws Exception {
>>>
>>> 
>>> 
>>> 

>>>         KVTuple6 kvTypeTuple = iterable.iterator().next();
>>>         System.*out*.println("window: " + kvTypeTuple);
>>>         // Doesn't work here if use broadcast Point.Builder builder
>>>         = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>>>
>>> 
>>> 
>>> 

>>>                 .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>>>
>>> 
>>> 
>>> 

>>>                 .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_TYPE*, kvTypeTuple.f2)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_KEY*, kvTypeTuple.f3)
>>> 

>>> 
>>> 
>>> 

>>>                 .addField(*FIELD*, kvTypeTuple.f4);
>>>
>>>         collector.collect(builder.build()); }
>>>
>>> 
>>> 
>>> 

>>> }
>>> 

>>> 
>>> 
>>> 

>>> 

>>> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org>
>>> wrote:
>>>> Hi Sam,

>>>> 

>>>> could you explain the behavior a bit more? How does the window
>>>> function behave? Is it not triggered or what is the content? What
>>>> is the result if you don't use a window function?
>>>> 

>>>> Timo

>>>> 

>>>> 

>>>> Am 08/03/17 um 02:59 schrieb Sam Huang:

>>>> 

>>>>> btw, the reduce function works well, I've printed out the data,
>>>>> and they are
>>>>> all correct. So are the timestamps and watermarks. And if I remove
>>>>> ".broadcast()", the data is successfully sinked.

>>>>> 

>>>>> Any help?

>>>>> 

>>>>> 

>>>>> 

>>>>> --

>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>> 

>> 



Re: window function not working when control stream broadcast

Posted by Sam Huang <sa...@reflektion.com>.
Hi Aljoscha,

Here's the code:

private static class DataFilterFunImpl extends
RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
        private JSONParser parser;
        private Map<String, Map<String, ControlJsonConfig>>
whiteListMap = new HashMap<>();

        @Override
        // tuple5(domain, device_type, type, key, count_or_sum)
        public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6>
collector) throws Exception {
            String type = dataTuple.f2;
            String[] keyValue =
dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
            String key = keyValue[0];
            switch (type) {
                case RawEventExtractor.Constants.VALUE_COUNT: {
                    if (whiteListMap.containsKey(key)) {
                        ControlJsonConfig ruleConfig =
whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
                        if (ruleConfig != null) {
                            String value = keyValue.length > 1 ?
keyValue[1] : "";
                            String bucket = ruleConfig.getBucketName(value);
                            if (bucket != null) {

dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP,
key, bucket), 3);
                                collector.collect(dataTuple);
                            }
                        } else {
                            collector.collect(dataTuple);
                        }
                    }
                    break;
                }
                case RawEventExtractor.Constants.VALUE_SUM: {
                    if (whiteListMap.containsKey(key) &&
whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM))
{
                        collector.collect(dataTuple);
                    }
                    break;
                }
                default: collector.collect(dataTuple);
            }
        }


        @Override
        public void flatMap2(String jsonStr, Collector<KVTuple6>
collector) throws Exception {
//            Map<String, Map<String, ControlJsonConfig>> whiteListMap
= whiteListMapState.value();
            try {
                if (parser == null) {
                     parser = new JSONParser();
                }
                JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
                Tuple2<String, Map<String, ControlJsonConfig>> config
= RawEventExtractor.getKeyConfig(jsonConfig);
                if (config.f1 == null) {
                    whiteListMap.remove(config.f0);
                } else {
                    whiteListMap.put(config.f0, config.f1);
                }
            } catch (Exception e) {}
        }
    }


FYI, if I setParallelism of both the control stream and data stream, the
window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Sam,
> could you please also send the code for the DataFilterFunImpl and your
> timestamps/watermark assigner. That could help in figuring out the problem.
>
> Best,
> Aljoscha
>
>
> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
>
> Hi Timo,
>
> The window function sinks the data into InfluxDB, and it's not triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:
>
> private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
>                       Collector<Point> collector) throws Exception {
>         KVTuple6 kvTypeTuple = iterable.iterator().next();
>         System.*out*.println("window: " + kvTypeTuple);                     // Doesn't work here if use broadcast
>         Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>                 .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>                 .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>                 .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>                 .tag(*TAG_TYPE*, kvTypeTuple.f2)
>                 .tag(*TAG_KEY*, kvTypeTuple.f3)
>                 .addField(*FIELD*, kvTypeTuple.f4);
>
>         collector.collect(builder.build());
>     }
> }
>
>
> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
> are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/window-function-not-
> working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
>

Re: window function not working when control stream broadcast

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Sam,

could you please also send the code for the DataFilterFunImpl and
your timestamps/watermark assigner. That could help in figuring out
the problem.


Best,

Aljoscha





On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:

> Hi Timo,

> 

> The window function sinks the data into InfluxDB, and it's not
> triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:

> 

> private static class WindowFunImpl implements
> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>
>     @Override public void apply(Tuple tuple, TimeWindow window,
>     Iterable<KVTuple6> iterable,                  Collector<Point>
>     collector) throws Exception {
>
>         KVTuple6 kvTypeTuple = iterable.iterator().next();
>         System.*out*.println("window: " + kvTypeTuple);
>         // Doesn't work here if use broadcast Point.Builder builder =
>         Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>
>                 .time(window.getStart(), TimeUnit.*MILLISECONDS*)
> 

>                 .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
> 

>                 .tag(*TAG_DEVICE*, kvTypeTuple.f1)
> 

>                 .tag(*TAG_TYPE*, kvTypeTuple.f2)
> 

>                 .tag(*TAG_KEY*, kvTypeTuple.f3)
> 

>                 .addField(*FIELD*, kvTypeTuple.f4);
>
>         collector.collect(builder.build()); }
>
> }
> 

> 

> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther
> <tw...@apache.org> wrote:
>> Hi Sam,

>> 

>>  could you explain the behavior a bit more? How does the window
>>  function behave? Is it not triggered or what is the content? What is
>>  the result if you don't use a window function?
>> 

>>  Timo

>> 

>> 

>>  Am 08/03/17 um 02:59 schrieb Sam Huang:

>> 

>>> btw, the reduce function works well, I've printed out the data, and
>>> they are
>>>  all correct. So are the timestamps and watermarks. And if I remove
>>>  ".broadcast()", the data is successfully sinked.

>>> 

>>>  Any help?

>>> 

>>> 

>>> 

>>>  --

>>>  View this message in context:
>>>  http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>>  Sent from the Apache Flink User Mailing List archive. mailing list
>>>  archive at Nabble.com.
>> 



Re: window function not working when control stream broadcast

Posted by Sam Huang <sa...@reflektion.com>.
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce
function, it works
Code for window function is here:

private static class WindowFunImpl implements
WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow window,
Iterable<KVTuple6> iterable,
                      Collector<Point> collector) throws Exception {
        KVTuple6 kvTypeTuple = iterable.iterator().next();
        System.out.println("window: " + kvTypeTuple);
   // Doesn't work here if use broadcast
        Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
                .time(window.getStart(), TimeUnit.MILLISECONDS)
                .tag(TAG_DOMAIN, kvTypeTuple.f0)
                .tag(TAG_DEVICE, kvTypeTuple.f1)
                .tag(TAG_TYPE, kvTypeTuple.f2)
                .tag(TAG_KEY, kvTypeTuple.f3)
                .addField(FIELD, kvTypeTuple.f4);

        collector.collect(builder.build());
    }
}


On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
>> are
>> all correct. So are the timestamps and watermarks. And if I remove
>> ".broadcast()", the data is successfully sinked.
>>
>> Any help?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/window-function-not
>> -working-when-control-stream-broadcast-tp12093p12094.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>

Re: window function not working when control stream broadcast

Posted by Timo Walther <tw...@apache.org>.
Hi Sam,

could you explain the behavior a bit more? How does the window function 
behave? Is it not triggered or what is the content? What is the result 
if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:
> btw, the reduce function works well, I've printed out the data, and they are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Re: window function not working when control stream broadcast

Posted by Sam Huang <sa...@reflektion.com>.
btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.