You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sam Huang <sa...@reflektion.com> on 2017/03/07 23:17:09 UTC
window function not working when control stream broadcast
Hi all,
I connected my data stream with my control stream and create event time
tumbling window, everything works fine. But when I add .broadcast()
function to the control stream, the window function doesn't work anymore.
I'm running that on my local, the code is here:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);
DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
DataStream<String> controlStream =
JsonEventStreamReader.readControlStream(env);//.broadcast(); With
broadcast, window function doesn't work
jsonEventStream
.flatMap(new StrToTuplesFlatMapFunImpl())
.connect(controlStream)
.flatMap(new DataFilterFunImpl())
.assignTimestampsAndWatermarks(getTimestampsWatermarksAssigner())
.keyBy(0, 1, 2, 3)
.timeWindow(Time.seconds(WINDOW_LENGTH))
.allowedLateness(Time.seconds(WINDOW_LATENESS))
.reduce(new ReduceFunImpl(), new WindowFunImpl())
.addSink(new InfluxDBSink(INFLUXDB_DB));
env.execute();
}
Re: window function not working when control stream broadcast
Posted by Aljoscha Krettek <al...@apache.org>.
Hi Sam,
could you please also send the code of the timestamp/watermark assigner?
This could also affect things.
Best,
Aljoscha
On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote:
> Hi Aljoscha,
>
> Here's the code:
> private static class DataFilterFunImpl extends
> RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
>
> private JSONParser parser; private Map<String, Map<String,
> ControlJsonConfig>> whiteListMap = new HashMap<>();
>
> @Override // tuple5(domain, device_type, type, key,
> count_or_sum) public void flatMap1(KVTuple6 dataTuple,
> Collector<KVTuple6> collector) throws Exception {
>
> String type = dataTuple.f2; String[] keyValue = dataTuple-
> .f3.split(RawEventExtractor.Constants.*DEFAULT_VALUE_SP*);
> String key = keyValue[]; switch (type) {
>
> case RawEventExtractor.Constants.*VALUE_COUNT*: {
>
> if (whiteListMap.containsKey(key)) {
>
> ControlJsonConfig ruleConfig = whiteListMap.g-
> et(key).get(RawEventExtractor.Constants.*VALU-
> E_COUNT*); if (ruleConfig != null) {
>
> String value = keyValue.length > 1 ?
> keyValue[1] : ""; String bucket =
> ruleConfig.getBucketName(value); if
> (bucket != null) {
>
> dataTuple.setField(String.*join*(RawE-
> ventExtractor.Constants.*DEFAULT_VALU-
> E_SP*, key, bucket), 3);
> collector.collect(dataTuple); }
>
> } else {
>
> collector.collect(dataTuple); }
>
> }
>
> break;
> }
>
> case RawEventExtractor.Constants.*VALUE_SUM*: {
>
> if (whiteListMap.containsKey(key) && whiteListMap-
> .get(key).containsKey(RawEventExtractor.Constants-
> .*VALUE_SUM*)) {
>
> collector.collect(dataTuple); }
>
> break;
> }
>
> default: collector.collect(dataTuple); }
>
> }
>
>
>
>
>
> @Override public void flatMap2(String jsonStr,
> Collector<KVTuple6> collector) throws Exception {
>
> // Map<String, Map<String, ControlJsonConfig>> whiteListMap
> = whiteListMapState.value(); try {
>
> if (parser == null) {
>
> parser = new JSONParser(); }
>
> JSONObject jsonConfig = (JSONObject)
> parser.parse(jsonStr); Tuple2<String, Map<String,
> ControlJsonConfig>> config =
> RawEventExtractor.*getKeyConfig*(jsonConfig); if
> (config.f1 == null) {
>
> whiteListMap.remove(config.f0); } else {
>
> whiteListMap.put(config.f0, config.f1); }
>
> } catch (Exception e) {}
>
> }
>
> }
>
>
> FYI, if I setParallelism of both the control stream and data stream,
> the window function works. Is it necessary to do so for broadcast()
> function?
>
>
> On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek
> <al...@apache.org> wrote:
>> __
>> Hi Sam,
>> could you please also send the code for the DataFilterFunImpl and
>> your timestamps/watermark assigner. That could help in figuring out
>> the problem.
>>
>> Best,
>> Aljoscha
>>
>>
>> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
>>> Hi Timo,
>>>
>>> The window function sinks the data into InfluxDB, and it's not
>>> triggered.
>>> If I comment the ".timeWindow", and print results after the reduce
>>> function, it works
>>> Code for window function is here:
>>>
>>> private static class WindowFunImpl implements
>>> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>>>
>>>
>>>
>>>
>>> @Override public void apply(Tuple tuple, TimeWindow window,
>>> Iterable<KVTuple6> iterable, Collector<Point>
>>> collector) throws Exception {
>>>
>>>
>>>
>>>
>>> KVTuple6 kvTypeTuple = iterable.iterator().next();
>>> System.*out*.println("window: " + kvTypeTuple);
>>> // Doesn't work here if use broadcast Point.Builder builder
>>> = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>>>
>>>
>>>
>>>
>>> .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>>>
>>>
>>>
>>>
>>> .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>>>
>>>
>>>
>>>
>>> .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>>>
>>>
>>>
>>>
>>> .tag(*TAG_TYPE*, kvTypeTuple.f2)
>>>
>>>
>>>
>>>
>>> .tag(*TAG_KEY*, kvTypeTuple.f3)
>>>
>>>
>>>
>>>
>>> .addField(*FIELD*, kvTypeTuple.f4);
>>>
>>> collector.collect(builder.build()); }
>>>
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org>
>>> wrote:
>>>> Hi Sam,
>>>>
>>>> could you explain the behavior a bit more? How does the window
>>>> function behave? Is it not triggered or what is the content? What
>>>> is the result if you don't use a window function?
>>>>
>>>> Timo
>>>>
>>>>
>>>> Am 08/03/17 um 02:59 schrieb Sam Huang:
>>>>
>>>>> btw, the reduce function works well, I've printed out the data,
>>>>> and they are
>>>>> all correct. So are the timestamps and watermarks. And if I remove
>>>>> ".broadcast()", the data is successfully sinked.
>>>>>
>>>>> Any help?
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>
>>
Re: window function not working when control stream broadcast
Posted by Sam Huang <sa...@reflektion.com>.
Hi Aljoscha,
Here's the code:
private static class DataFilterFunImpl extends
RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
private JSONParser parser;
private Map<String, Map<String, ControlJsonConfig>>
whiteListMap = new HashMap<>();
@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6>
collector) throws Exception {
String type = dataTuple.f2;
String[] keyValue =
dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key = keyValue[0];
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT: {
if (whiteListMap.containsKey(key)) {
ControlJsonConfig ruleConfig =
whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
if (ruleConfig != null) {
String value = keyValue.length > 1 ?
keyValue[1] : "";
String bucket = ruleConfig.getBucketName(value);
if (bucket != null) {
dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP,
key, bucket), 3);
collector.collect(dataTuple);
}
} else {
collector.collect(dataTuple);
}
}
break;
}
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) &&
whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM))
{
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
}
}
@Override
public void flatMap2(String jsonStr, Collector<KVTuple6>
collector) throws Exception {
// Map<String, Map<String, ControlJsonConfig>> whiteListMap
= whiteListMapState.value();
try {
if (parser == null) {
parser = new JSONParser();
}
JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
Tuple2<String, Map<String, ControlJsonConfig>> config
= RawEventExtractor.getKeyConfig(jsonConfig);
if (config.f1 == null) {
whiteListMap.remove(config.f0);
} else {
whiteListMap.put(config.f0, config.f1);
}
} catch (Exception e) {}
}
}
FYI, if I setParallelism of both the control stream and data stream, the
window function works. Is it necessary to do so for broadcast() function?
On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <al...@apache.org>
wrote:
> Hi Sam,
> could you please also send the code for the DataFilterFunImpl and your
> timestamps/watermark assigner. That could help in figuring out the problem.
>
> Best,
> Aljoscha
>
>
> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
>
> Hi Timo,
>
> The window function sinks the data into InfluxDB, and it's not triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:
>
> private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
> @Override
> public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
> Collector<Point> collector) throws Exception {
> KVTuple6 kvTypeTuple = iterable.iterator().next();
> System.*out*.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
> Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
> .time(window.getStart(), TimeUnit.*MILLISECONDS*)
> .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
> .tag(*TAG_DEVICE*, kvTypeTuple.f1)
> .tag(*TAG_TYPE*, kvTypeTuple.f2)
> .tag(*TAG_KEY*, kvTypeTuple.f3)
> .addField(*FIELD*, kvTypeTuple.f4);
>
> collector.collect(builder.build());
> }
> }
>
>
> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
> are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/window-function-not-
> working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
>
Re: window function not working when control stream broadcast
Posted by Aljoscha Krettek <al...@apache.org>.
Hi Sam,
could you please also send the code for the DataFilterFunImpl and
your timestamps/watermark assigner. That could help in figuring out
the problem.
Best,
Aljoscha
On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
> Hi Timo,
>
> The window function sinks the data into InfluxDB, and it's not
> triggered.
> If I comment the ".timeWindow", and print results after the reduce
> function, it works
> Code for window function is here:
>
> private static class WindowFunImpl implements
> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>
> @Override public void apply(Tuple tuple, TimeWindow window,
> Iterable<KVTuple6> iterable, Collector<Point>
> collector) throws Exception {
>
> KVTuple6 kvTypeTuple = iterable.iterator().next();
> System.*out*.println("window: " + kvTypeTuple);
> // Doesn't work here if use broadcast Point.Builder builder =
> Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>
> .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>
> .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>
> .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>
> .tag(*TAG_TYPE*, kvTypeTuple.f2)
>
> .tag(*TAG_KEY*, kvTypeTuple.f3)
>
> .addField(*FIELD*, kvTypeTuple.f4);
>
> collector.collect(builder.build()); }
>
> }
>
>
> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther
> <tw...@apache.org> wrote:
>> Hi Sam,
>>
>> could you explain the behavior a bit more? How does the window
>> function behave? Is it not triggered or what is the content? What is
>> the result if you don't use a window function?
>>
>> Timo
>>
>>
>> Am 08/03/17 um 02:59 schrieb Sam Huang:
>>
>>> btw, the reduce function works well, I've printed out the data, and
>>> they are
>>> all correct. So are the timestamps and watermarks. And if I remove
>>> ".broadcast()", the data is successfully sinked.
>>>
>>> Any help?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>
Re: window function not working when control stream broadcast
Posted by Sam Huang <sa...@reflektion.com>.
Hi Timo,
The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce
function, it works
Code for window function is here:
private static class WindowFunImpl implements
WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window,
Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {
KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple);
// Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLISECONDS)
.tag(TAG_DOMAIN, kvTypeTuple.f0)
.tag(TAG_DEVICE, kvTypeTuple.f1)
.tag(TAG_TYPE, kvTypeTuple.f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple.f4);
collector.collect(builder.build());
}
}
On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <tw...@apache.org> wrote:
> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
>> are
>> all correct. So are the timestamps and watermarks. And if I remove
>> ".broadcast()", the data is successfully sinked.
>>
>> Any help?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/window-function-not
>> -working-when-control-stream-broadcast-tp12093p12094.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
Re: window function not working when control stream broadcast
Posted by Timo Walther <tw...@apache.org>.
Hi Sam,
could you explain the behavior a bit more? How does the window function
behave? Is it not triggered or what is the content? What is the result
if you don't use a window function?
Timo
Am 08/03/17 um 02:59 schrieb Sam Huang:
> btw, the reduce function works well, I've printed out the data, and they are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: window function not working when control stream broadcast
Posted by Sam Huang <sa...@reflektion.com>.
btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.
Any help?
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.