You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by suman shil <cn...@gmail.com> on 2021/08/19 04:43:24 UTC

Pre shuffle aggregation in flink is not working

I am trying to do pre shuffle aggregation in flink. Following is the
MapBundle implementation.



















*public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
  if (value == null) {            return input;        }        value.tip =
value.tip + input.tip;        return value;    }    @Override    public
void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
buffer.entrySet()) {            out.collect(entry.getValue());        }
}}*

I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
not working as the "*count*" variable is always 0. Please let me know If I
am missing something.








*    @Override    public void onElement(T element) throws Exception {
  count++;        if (count >= maxCount) {
callback.finishBundle();            reset();        }    }*

Here is the main code.


























*        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
  KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        DataStream<Tuple3<Long, Long,
Float>> hourlyTips =//                            fares.keyBy((TaxiFare
fare) -> fare.driverId)//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;                fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class),                        new
TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
            ))                        .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
                      @Override                            public long
extractTimestamp(TaxiFare element) {                                return
element.startTime.getEpochSecond();                            }
            })                        .keyBy((TaxiFare fare) ->
fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
      .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*

Thanks
Suman

Re: Pre shuffle aggregation in flink is not working

Posted by JING ZHANG <be...@gmail.com>.
Hi Suman,
I've learned the providing code, and have some questions,
1. Why we do a
WindowAggregate window(TumblingProcessingTimeWindows.of(Time.minutes(1))),
then do a windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
One uses `ProcessingTimeWindow`, the other uses `EventTimeWindow`. And one
uses a 1 minute window, the other uses 1 hour.
What purpose do we want to achieve by this?

2. The original intention of introducing `MapBundleFunction`,
`MapBundleOperator`, and `CountBundleTrigger` is to do MiniBatch
optimization for Group aggregate instead of Window aggregate,
We cannot use them directly or use them after slight modification to do
local-aggregate for window aggregate. I might have misled you in the
previous response, I'm very sorry about that.
There are more factors to consider when doing local-aggregate for window
aggregate, You could refer to the `LocalSlicingWindowAggOperator` class
which is a local-aggregate for window aggregate in Flink SQL introduced in
Flink 1.13 version.
  (1) It should accumulate input data into a local buffer with window
namespaces.
  (2) It should flush the local buffer when the size of the buffer exceeds
threshold and the current progress passes the window end time.
If we just directly use `MapBundleFunction`, `MapBundleOperator`, and
`CountBundleTrigger` to do local-aggregate optimization for window
aggregate, the result is unexpected.

There are three solutions:
(1) Could we use just Group aggregate instead of window aggregate to
achieve the purpose?
(2) Define our own local window aggregate operator. You could refer to the `
LocalSlicingWindowAggOperator` class in Flink 1.13 version.
(3) Directly use Flink SQL to do window aggregate[1], specify the `
table.optimizer.agg-phase-strategy` to `TWO_PHASE` to enable
local-aggregation for window aggregate.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/

Welcome to discuss in a further step.

Best,
JING ZHANG

suman shil <cn...@gmail.com> 于2021年8月22日周日 上午12:13写道:

> Hi JING,
> Thanks for the pointers.
>
> 1) I am able to debug why the variable `*numOfElements` *was getting
> reset to 0.  The following method of* AbstarctMapBundleOperator.java *was
> getting called which was resetting the variable to 0 before it could reach
> max count.
>
>
>
>
>
>
>
>
>
> *    @Override    public void finishBundle() throws Exception {        if
> (!bundle.isEmpty()) {            numOfElements = 0;
> function.finishBundle(bundle, collector);            bundle.clear();
> }        bundleTrigger.reset();    }*
>
> 2) To avoid this problem I created a simple aggregator which will
> accumulate the elements in a LinkedHashMap and output them when it reaches
> a max count. I can see now the bundle size is reaching the max and *output.collect
> *is getting called. But I still don't see output. Here is the new
> aggregator code.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class MyAggregator<K, V, IN, OUT> extends
> AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
> private int count;    private Map<K, OUT> bundle = new LinkedHashMap<>();
>   private MapBundleFunction<K, OUT, IN, OUT> bundleFunction;    private
> KeySelector<IN, K> keySelector;    public MyAggregator(int count,
> MapBundleFunction<K, OUT, IN, OUT> bundleFunction, KeySelector<IN, K>
> keySelector) {        this.count = count;        this.bundleFunction =
> bundleFunction;        this.keySelector = keySelector;    }    @Override
> public void open() throws Exception {        bundle = new
> LinkedHashMap<>();    }    @Override    public void
> processElement(StreamRecord<IN> element) throws Exception {        K key =
> getKey(element);        OUT value = bundle.get(key);        OUT newValue =
> bundleFunction.addInput(value, element.getValue());        bundle.put(key,
> newValue);        if (bundle.size() > count) {            for (Map.Entry<K,
> OUT> entry :bundle.entrySet()) {                output.collect(new
> StreamRecord<>(entry.getValue()));            }            bundle.clear();
>       }    }    private K getKey(StreamRecord<IN> element) throws Exception
> {        return keySelector.getKey(element.getValue());    }}*
>
> Following is the drive code
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class HourlyTipsSolution extends ExerciseBase {    /**     * Main
> method.     *     * @throws Exception which occurs during job execution.
>  */    public static void main(String[] args) throws Exception {        //
> set up streaming execution environment        StreamExecutionEnvironment
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(ExerciseBase.parallelism);        // start the data
> generator        DataStream<TaxiFare> fares =
> env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
> tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
> TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
>     BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>       KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
> KeySelector<TaxiFare, Long>() {            @Override            public Long
> getKey(TaxiFare value) throws Exception {                return
> value.driverId;            }        };        MyAggregator<Long, TaxiFare,
> TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
> mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
> Long, Float>> hourlyTips =                fares.transform("preshuffle",
> TypeInformation.of(TaxiFare.class), aggregator)
> .keyBy((TaxiFare fare) -> fare.driverId)
> .window(TumblingProcessingTimeWindows.of(Time.hours(1)))
>     .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
> hourlyMax =
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
>       printOrTest(hourlyMax);        // execute the transformation
> pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
> the pre-aggregated result into a tuple along with the window's timestamp
> and key.     */    public static class AddTips            extends
> ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
> TimeWindow> {        @Override        public void process(
> Long key,                Context context,                Iterable<TaxiFare>
> fares,                Collector<Tuple3<Long, Long, Float>> out) {
>   float sumOfTips = 0F;            for (TaxiFare f : fares) {
>   sumOfTips += f.tip;            }
> out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
> }    }}*
>
> But the following driver code works when I remove the aggregator
>
>
> *public class HourlyTipsSolution extends ExerciseBase {*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *   /**     * Main method.     *     * @throws Exception which occurs
> during job execution.     */    public static void main(String[] args)
> throws Exception {        // set up streaming execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(ExerciseBase.parallelism);        // start the data
> generator        DataStream<TaxiFare> fares =
> env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
> tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
> TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
>     BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(2);
>     KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
> KeySelector<TaxiFare, Long>() {            @Override            public Long
> getKey(TaxiFare value) throws Exception {                return
> value.driverId;            }        };        MyAggregator<Long, TaxiFare,
> TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
> mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
> Long, Float>> hourlyTips =                            fares.keyBy((TaxiFare
> fare) -> fare.driverId)
>  .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
> AddTips());;        DataStream<Tuple3<Long, Long, Float>> hourlyMax =
>
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).maxBy(2);
>       printOrTest(hourlyMax);        // execute the transformation
> pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
> the pre-aggregated result into a tuple along with the window's timestamp
> and key.     */    public static class AddTips            extends
> ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
> TimeWindow> {        @Override        public void process(
> Long key,                Context context,                Iterable<TaxiFare>
> fares,                Collector<Tuple3<Long, Long, Float>> out) {
>   float sumOfTips = 0F;            for (TaxiFare f : fares) {
>   sumOfTips += f.tip;            }
> out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
> }    }}*
>
> On Fri, Aug 20, 2021 at 4:18 AM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Suman,
>> > But I am always seeing the following code of `
>> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
>> It is weird, please set a breakpoint at line `
>> *bundleTrigger.onElement(input);*`  in `*processElement*` method to see
>> what happens when a record is processed by `*processElement*`.
>>
>> > One more question, you mentioned that I need to test with `
>> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this change?
>> You could copy the class  `AbstractMapBundleOperator`, and update the
>> bundle initialization code in the `open` method.
>> Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
>> not marked  as @public, they have no guarantee of compatibility.
>> You'd better copy them for your own use.
>>
>> Best,
>> JING ZHANG
>>
>> suman shil <cn...@gmail.com> 于2021年8月20日周五 下午2:18写道:
>>
>>> Hi Jing,
>>> I tried using `*MapBundleOperator*` also (I am yet to test with
>>> LinkedHashMap) . But I am always seeing that the following code of `
>>> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
>>> never getting incremented. I replaced `*TaxiFareStream*` with `
>>> *MapBundleOperator*` in the above code. It should increment by 1
>>> each time an element is processed but that is not happening.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *    public void processElement(StreamRecord<IN> element) throws
>>> Exception {        // get the key and value for the map bundle        final
>>> IN input = element.getValue();        final K bundleKey = getKey(input);
>>>     final V bundleValue = bundle.get(bundleKey);        // get a new value
>>> after adding this element to bundle        final V newBundleValue =
>>> function.addInput(bundleValue, input);        // update to map bundle
>>>   bundle.put(bundleKey, newBundleValue);        numOfElements++;
>>> bundleTrigger.onElement(input);    }*
>>>
>>> One more question, you mentioned that I need to test with `
>>> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this
>>> change? Do I need to create a class which extends from `MapBundleOperator`
>>> and add it there?
>>>
>>> Thanks
>>>
>>>
>>> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <be...@gmail.com> wrote:
>>>
>>>> Hi Suman,
>>>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>>>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>> suman shil <cn...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>>>>
>>>>> Hi Jing,
>>>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>>>> was following this link
>>>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>>>> . Please let me know if there is any other way of aggregating elements
>>>>> locally.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>>>>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>>>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>>>>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>>>>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>>>>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>>>>   this.keySelector = keySelector;    }    @Override    protected Long
>>>>> getKey(TaxiFare input) throws Exception {        return
>>>>> keySelector.getKey(input);    }}*
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Suman,
>>>>>> Would you please provide the code about `*TaxiFareStream*`? It seems
>>>>>> we could use `MapBundleOperator` directly here.
>>>>>> BTW, I have some concerns about using the solution to do
>>>>>> local-aggregation for window aggregation because `MapBundleOperator`
>>>>>> would save input data in a bundle which is a HashMap object which
>>>>>> could not keep the data input sequence. I'm afraid there exists
>>>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether
>>>>>> it is reasonable to assign a watermark based on an unordered
>>>>>> timestamp.
>>>>>>
>>>>>> Best,
>>>>>> JING ZHANG
>>>>>>
>>>>>>
>>>>>>
>>>>>> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>>>>
>>>>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>>>>> MapBundle implementation.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *public class TaxiFareMapBundleFunction extends
>>>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {    @Override
>>>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws
>>>>>>> Exception {        if (value == null) {            return input;        }
>>>>>>>       value.tip = value.tip + input.tip;        return value;    }
>>>>>>> @Override    public void finishBundle(Map<Long, TaxiFare> buffer,
>>>>>>> Collector<TaxiFare> out) throws Exception {        for (Map.Entry<Long,
>>>>>>> TaxiFare> entry : buffer.entrySet()) {
>>>>>>> out.collect(entry.getValue());        }    }}*
>>>>>>>
>>>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle
>>>>>>> aggregation is not working as the "*count*" variable is always 0.
>>>>>>> Please let me know If I am missing something.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *    @Override    public void onElement(T element) throws Exception
>>>>>>> {        count++;        if (count >= maxCount) {
>>>>>>> callback.finishBundle();            reset();        }    }*
>>>>>>>
>>>>>>> Here is the main code.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>>>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>>>>>> getKey(TaxiFare value) throws Exception {                return
>>>>>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>>>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>>>>> fare) -> fare.driverId)//
>>>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>>>>> AddTips());;                fares.transform("preshuffle",
>>>>>>> TypeInformation.of(TaxiFare.class),                        new
>>>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>>>>                       @Override                            public long
>>>>>>> extractTimestamp(TaxiFare element) {                                return
>>>>>>> element.startTime.getEpochSecond();                            }
>>>>>>>             })                        .keyBy((TaxiFare fare) ->
>>>>>>> fare.driverId)
>>>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>>>>>> hourlyMax =
>>>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>>>>
>>>>>>> Thanks
>>>>>>> Suman
>>>>>>>
>>>>>>

Re: Pre shuffle aggregation in flink is not working

Posted by suman shil <cn...@gmail.com>.
Hi JING,
Thanks for the pointers.

1) I am able to debug why the variable `*numOfElements` *was getting reset
to 0.  The following method of* AbstarctMapBundleOperator.java *was getting
called which was resetting the variable to 0 before it could reach max
count.









*    @Override    public void finishBundle() throws Exception {        if
(!bundle.isEmpty()) {            numOfElements = 0;
function.finishBundle(bundle, collector);            bundle.clear();
}        bundleTrigger.reset();    }*

2) To avoid this problem I created a simple aggregator which will
accumulate the elements in a LinkedHashMap and output them when it reaches
a max count. I can see now the bundle size is reaching the max and
*output.collect
*is getting called. But I still don't see output. Here is the new
aggregator code.




































*public class MyAggregator<K, V, IN, OUT> extends
AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
private int count;    private Map<K, OUT> bundle = new LinkedHashMap<>();
  private MapBundleFunction<K, OUT, IN, OUT> bundleFunction;    private
KeySelector<IN, K> keySelector;    public MyAggregator(int count,
MapBundleFunction<K, OUT, IN, OUT> bundleFunction, KeySelector<IN, K>
keySelector) {        this.count = count;        this.bundleFunction =
bundleFunction;        this.keySelector = keySelector;    }    @Override
public void open() throws Exception {        bundle = new
LinkedHashMap<>();    }    @Override    public void
processElement(StreamRecord<IN> element) throws Exception {        K key =
getKey(element);        OUT value = bundle.get(key);        OUT newValue =
bundleFunction.addInput(value, element.getValue());        bundle.put(key,
newValue);        if (bundle.size() > count) {            for (Map.Entry<K,
OUT> entry :bundle.entrySet()) {                output.collect(new
StreamRecord<>(entry.getValue()));            }            bundle.clear();
      }    }    private K getKey(StreamRecord<IN> element) throws Exception
{        return keySelector.getKey(element.getValue());    }}*

Following is the drive code






























































*public class HourlyTipsSolution extends ExerciseBase {    /**     * Main
method.     *     * @throws Exception which occurs during job execution.
 */    public static void main(String[] args) throws Exception {        //
set up streaming execution environment        StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(ExerciseBase.parallelism);        // start the data
generator        DataStream<TaxiFare> fares =
env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
    BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
      KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        MyAggregator<Long, TaxiFare,
TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
Long, Float>> hourlyTips =                fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class), aggregator)
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.hours(1)))
    .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
      printOrTest(hourlyMax);        // execute the transformation
pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
the pre-aggregated result into a tuple along with the window's timestamp
and key.     */    public static class AddTips            extends
ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
TimeWindow> {        @Override        public void process(
Long key,                Context context,                Iterable<TaxiFare>
fares,                Collector<Tuple3<Long, Long, Float>> out) {
  float sumOfTips = 0F;            for (TaxiFare f : fares) {
  sumOfTips += f.tip;            }
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}    }}*

But the following driver code works when I remove the aggregator


*public class HourlyTipsSolution extends ExerciseBase {*


























































*   /**     * Main method.     *     * @throws Exception which occurs
during job execution.     */    public static void main(String[] args)
throws Exception {        // set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(ExerciseBase.parallelism);        // start the data
generator        DataStream<TaxiFare> fares =
env.addSource(fareSourceOrTest(new TaxiFareGenerator()));        // compute
tips per hour for each driver        MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
    BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(2);
    KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        MyAggregator<Long, TaxiFare,
TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10,
mapBundleFunction, taxiFareLongKeySelector);        DataStream<Tuple3<Long,
Long, Float>> hourlyTips =                            fares.keyBy((TaxiFare
fare) -> fare.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;        DataStream<Tuple3<Long, Long, Float>> hourlyMax =

hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).maxBy(2);
      printOrTest(hourlyMax);        // execute the transformation
pipeline        env.execute("Hourly Tips (java)");    }    /*     * Wraps
the pre-aggregated result into a tuple along with the window's timestamp
and key.     */    public static class AddTips            extends
ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long,
TimeWindow> {        @Override        public void process(
Long key,                Context context,                Iterable<TaxiFare>
fares,                Collector<Tuple3<Long, Long, Float>> out) {
  float sumOfTips = 0F;            for (TaxiFare f : fares) {
  sumOfTips += f.tip;            }
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}    }}*

On Fri, Aug 20, 2021 at 4:18 AM JING ZHANG <be...@gmail.com> wrote:

> Hi Suman,
> > But I am always seeing the following code of `
> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
> It is weird, please set a breakpoint at line `
> *bundleTrigger.onElement(input);*`  in `*processElement*` method to see
> what happens when a record is processed by `*processElement*`.
>
> > One more question, you mentioned that I need to test with `
> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this change?
> You could copy the class  `AbstractMapBundleOperator`, and update the
> bundle initialization code in the `open` method.
> Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
> not marked  as @public, they have no guarantee of compatibility.
> You'd better copy them for your own use.
>
> Best,
> JING ZHANG
>
> suman shil <cn...@gmail.com> 于2021年8月20日周五 下午2:18写道:
>
>> Hi Jing,
>> I tried using `*MapBundleOperator*` also (I am yet to test with
>> LinkedHashMap) . But I am always seeing that the following code of `
>> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
>> never getting incremented. I replaced `*TaxiFareStream*` with `
>> *MapBundleOperator*` in the above code. It should increment by 1
>> each time an element is processed but that is not happening.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *    public void processElement(StreamRecord<IN> element) throws
>> Exception {        // get the key and value for the map bundle        final
>> IN input = element.getValue();        final K bundleKey = getKey(input);
>>     final V bundleValue = bundle.get(bundleKey);        // get a new value
>> after adding this element to bundle        final V newBundleValue =
>> function.addInput(bundleValue, input);        // update to map bundle
>>   bundle.put(bundleKey, newBundleValue);        numOfElements++;
>> bundleTrigger.onElement(input);    }*
>>
>> One more question, you mentioned that I need to test with `
>> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this
>> change? Do I need to create a class which extends from `MapBundleOperator`
>> and add it there?
>>
>> Thanks
>>
>>
>> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <be...@gmail.com> wrote:
>>
>>> Hi Suman,
>>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> suman shil <cn...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>>>
>>>> Hi Jing,
>>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>>> was following this link
>>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>>> . Please let me know if there is any other way of aggregating elements
>>>> locally.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>>>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>>>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>>>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>>>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>>>   this.keySelector = keySelector;    }    @Override    protected Long
>>>> getKey(TaxiFare input) throws Exception {        return
>>>> keySelector.getKey(input);    }}*
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Suman,
>>>>> Would you please provide the code about `*TaxiFareStream*`? It seems
>>>>> we could use `MapBundleOperator` directly here.
>>>>> BTW, I have some concerns about using the solution to do
>>>>> local-aggregation for window aggregation because `MapBundleOperator`
>>>>> would save input data in a bundle which is a HashMap object which
>>>>> could not keep the data input sequence. I'm afraid there exists
>>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>>>> is reasonable to assign a watermark based on an unordered
>>>>> timestamp.
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>>
>>>>>
>>>>> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>>>
>>>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>>>> MapBundle implementation.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *public class TaxiFareMapBundleFunction extends
>>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {    @Override
>>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws
>>>>>> Exception {        if (value == null) {            return input;        }
>>>>>>       value.tip = value.tip + input.tip;        return value;    }
>>>>>> @Override    public void finishBundle(Map<Long, TaxiFare> buffer,
>>>>>> Collector<TaxiFare> out) throws Exception {        for (Map.Entry<Long,
>>>>>> TaxiFare> entry : buffer.entrySet()) {
>>>>>> out.collect(entry.getValue());        }    }}*
>>>>>>
>>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle
>>>>>> aggregation is not working as the "*count*" variable is always 0.
>>>>>> Please let me know If I am missing something.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *    @Override    public void onElement(T element) throws Exception
>>>>>> {        count++;        if (count >= maxCount) {
>>>>>> callback.finishBundle();            reset();        }    }*
>>>>>>
>>>>>> Here is the main code.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>>>>> getKey(TaxiFare value) throws Exception {                return
>>>>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>>>> fare) -> fare.driverId)//
>>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>>>> AddTips());;                fares.transform("preshuffle",
>>>>>> TypeInformation.of(TaxiFare.class),                        new
>>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>>>                       @Override                            public long
>>>>>> extractTimestamp(TaxiFare element) {                                return
>>>>>> element.startTime.getEpochSecond();                            }
>>>>>>             })                        .keyBy((TaxiFare fare) ->
>>>>>> fare.driverId)
>>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>>>>> hourlyMax =
>>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>>>
>>>>>> Thanks
>>>>>> Suman
>>>>>>
>>>>>

Re: Pre shuffle aggregation in flink is not working

Posted by JING ZHANG <be...@gmail.com>.
Hi Suman,
> But I am always seeing the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
It is weird, please set a breakpoint at line `
*bundleTrigger.onElement(input);*`  in `*processElement*` method to see
what happens when a record is processed by `*processElement*`.

> One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change?
You could copy the class  `AbstractMapBundleOperator`, and update the
bundle initialization code in the `open` method.
Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
not marked  as @public, they have no guarantee of compatibility.
You'd better copy them for your own use.

Best,
JING ZHANG

suman shil <cn...@gmail.com> 于2021年8月20日周五 下午2:18写道:

> Hi Jing,
> I tried using `*MapBundleOperator*` also (I am yet to test with
> LinkedHashMap) . But I am always seeing that the following code of `
> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
> never getting incremented. I replaced `*TaxiFareStream*` with `
> *MapBundleOperator*` in the above code. It should increment by 1
> each time an element is processed but that is not happening.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *    public void processElement(StreamRecord<IN> element) throws Exception
> {        // get the key and value for the map bundle        final IN input
> = element.getValue();        final K bundleKey = getKey(input);
> final V bundleValue = bundle.get(bundleKey);        // get a new value
> after adding this element to bundle        final V newBundleValue =
> function.addInput(bundleValue, input);        // update to map bundle
>   bundle.put(bundleKey, newBundleValue);        numOfElements++;
> bundleTrigger.onElement(input);    }*
>
> One more question, you mentioned that I need to test with `*LinkedHashMap*`
> instead of `*HashMap*`. Where should I make this change? Do I need to
> create a class which extends from `MapBundleOperator` and add it there?
>
> Thanks
>
>
> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Suman,
>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>
>> Best,
>> JING ZHANG
>>
>> suman shil <cn...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>>
>>> Hi Jing,
>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>> was following this link
>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>> . Please let me know if there is any other way of aggregating elements
>>> locally.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>>   this.keySelector = keySelector;    }    @Override    protected Long
>>> getKey(TaxiFare input) throws Exception {        return
>>> keySelector.getKey(input);    }}*
>>>
>>> Thanks
>>>
>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com> wrote:
>>>
>>>> Hi Suman,
>>>> Would you please provide the code about `*TaxiFareStream*`? It seems
>>>> we could use `MapBundleOperator` directly here.
>>>> BTW, I have some concerns about using the solution to do
>>>> local-aggregation for window aggregation because `MapBundleOperator`
>>>> would save input data in a bundle which is a HashMap object which could
>>>> not keep the data input sequence. I'm afraid there exists
>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>>> is reasonable to assign a watermark based on an unordered
>>>> timestamp.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>>
>>>>
>>>> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>>
>>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>>> MapBundle implementation.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *public class TaxiFareMapBundleFunction extends
>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {    @Override
>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws
>>>>> Exception {        if (value == null) {            return input;        }
>>>>>       value.tip = value.tip + input.tip;        return value;    }
>>>>> @Override    public void finishBundle(Map<Long, TaxiFare> buffer,
>>>>> Collector<TaxiFare> out) throws Exception {        for (Map.Entry<Long,
>>>>> TaxiFare> entry : buffer.entrySet()) {
>>>>> out.collect(entry.getValue());        }    }}*
>>>>>
>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>>>> is not working as the "*count*" variable is always 0. Please let me
>>>>> know If I am missing something.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *    @Override    public void onElement(T element) throws Exception {
>>>>>       count++;        if (count >= maxCount) {
>>>>> callback.finishBundle();            reset();        }    }*
>>>>>
>>>>> Here is the main code.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>>>> getKey(TaxiFare value) throws Exception {                return
>>>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>>> fare) -> fare.driverId)//
>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>>> AddTips());;                fares.transform("preshuffle",
>>>>> TypeInformation.of(TaxiFare.class),                        new
>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>>                       @Override                            public long
>>>>> extractTimestamp(TaxiFare element) {                                return
>>>>> element.startTime.getEpochSecond();                            }
>>>>>             })                        .keyBy((TaxiFare fare) ->
>>>>> fare.driverId)
>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>>>> hourlyMax =
>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>>
>>>>> Thanks
>>>>> Suman
>>>>>
>>>>

Re: Pre shuffle aggregation in flink is not working

Posted by suman shil <cn...@gmail.com>.
Hi Jing,
I tried using `*MapBundleOperator*` also (I am yet to test with
LinkedHashMap) . But I am always seeing that the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
never getting incremented. I replaced `*TaxiFareStream*` with `
*MapBundleOperator*` in the above code. It should increment by 1 each time
an element is processed but that is not happening.
















*    public void processElement(StreamRecord<IN> element) throws Exception
{        // get the key and value for the map bundle        final IN input
= element.getValue();        final K bundleKey = getKey(input);
final V bundleValue = bundle.get(bundleKey);        // get a new value
after adding this element to bundle        final V newBundleValue =
function.addInput(bundleValue, input);        // update to map bundle
  bundle.put(bundleKey, newBundleValue);        numOfElements++;
bundleTrigger.onElement(input);    }*

One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change? Do I need to
create a class which extends from `MapBundleOperator` and add it there?

Thanks


On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <be...@gmail.com> wrote:

> Hi Suman,
> Please try copy `*MapBundleOperator*`, update the `HashMap` to
> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>
> Best,
> JING ZHANG
>
> suman shil <cn...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>
>> Hi Jing,
>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
>> following this link
>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>> . Please let me know if there is any other way of aggregating elements
>> locally.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>   this.keySelector = keySelector;    }    @Override    protected Long
>> getKey(TaxiFare input) throws Exception {        return
>> keySelector.getKey(input);    }}*
>>
>> Thanks
>>
>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com> wrote:
>>
>>> Hi Suman,
>>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>>> could use `MapBundleOperator` directly here.
>>> BTW, I have some concerns about using the solution to do
>>> local-aggregation for window aggregation because `MapBundleOperator`
>>> would save input data in a bundle which is a HashMap object which could
>>> not keep the data input sequence. I'm afraid there exists
>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>> is reasonable to assign a watermark based on an unordered
>>> timestamp.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>>
>>> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>
>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>> MapBundle implementation.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>>>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>>   if (value == null) {            return input;        }        value.tip =
>>>> value.tip + input.tip;        return value;    }    @Override    public
>>>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>>>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>>>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>>>> }}*
>>>>
>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>>> is not working as the "*count*" variable is always 0. Please let me
>>>> know If I am missing something.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *    @Override    public void onElement(T element) throws Exception {
>>>>       count++;        if (count >= maxCount) {
>>>> callback.finishBundle();            reset();        }    }*
>>>>
>>>> Here is the main code.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>>> getKey(TaxiFare value) throws Exception {                return
>>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>> fare) -> fare.driverId)//
>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>> AddTips());;                fares.transform("preshuffle",
>>>> TypeInformation.of(TaxiFare.class),                        new
>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>                       @Override                            public long
>>>> extractTimestamp(TaxiFare element) {                                return
>>>> element.startTime.getEpochSecond();                            }
>>>>             })                        .keyBy((TaxiFare fare) ->
>>>> fare.driverId)
>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>>> hourlyMax =
>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>
>>>> Thanks
>>>> Suman
>>>>
>>>

Re: Pre shuffle aggregation in flink is not working

Posted by JING ZHANG <be...@gmail.com>.
Hi Suman,
Please try copy `*MapBundleOperator*`, update the `HashMap` to
`LinkedHashMap` to keep the output sequence consistent with input sequence.

Best,
JING ZHANG

suman shil <cn...@gmail.com> 于2021年8月20日周五 上午2:23写道:

> Hi Jing,
> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
> following this link
> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
> . Please let me know if there is any other way of aggregating elements
> locally.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
> bundleTrigger,                          KeySelector<TaxiFare, Long>
> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>   this.keySelector = keySelector;    }    @Override    protected Long
> getKey(TaxiFare input) throws Exception {        return
> keySelector.getKey(input);    }}*
>
> Thanks
>
> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Suman,
>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>> could use `MapBundleOperator` directly here.
>> BTW, I have some concerns about using the solution to do
>> local-aggregation for window aggregation because `MapBundleOperator`
>> would save input data in a bundle which is a HashMap object which could
>> not keep the data input sequence. I'm afraid there exists
>> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
>> reasonable to assign a watermark based on an unordered
>> timestamp.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>
>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>> MapBundle implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>   if (value == null) {            return input;        }        value.tip =
>>> value.tip + input.tip;        return value;    }    @Override    public
>>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>>> }}*
>>>
>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>> is not working as the "*count*" variable is always 0. Please let me
>>> know If I am missing something.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *    @Override    public void onElement(T element) throws Exception {
>>>     count++;        if (count >= maxCount) {
>>> callback.finishBundle();            reset();        }    }*
>>>
>>> Here is the main code.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>> getKey(TaxiFare value) throws Exception {                return
>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>> fare) -> fare.driverId)//
>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>> AddTips());;                fares.transform("preshuffle",
>>> TypeInformation.of(TaxiFare.class),                        new
>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>             ))                        .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>                       @Override                            public long
>>> extractTimestamp(TaxiFare element) {                                return
>>> element.startTime.getEpochSecond();                            }
>>>             })                        .keyBy((TaxiFare fare) ->
>>> fare.driverId)
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>> hourlyMax =
>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>
>>> Thanks
>>> Suman
>>>
>>

Re: Pre shuffle aggregation in flink is not working

Posted by suman shil <cn...@gmail.com>.
Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.














*public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
  public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
bundleTrigger,                          KeySelector<TaxiFare, Long>
keySelector) {        super(userFunction, bundleTrigger, keySelector);
  this.keySelector = keySelector;    }    @Override    protected Long
getKey(TaxiFare input) throws Exception {        return
keySelector.getKey(input);    }}*

Thanks

On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <be...@gmail.com> wrote:

> Hi Suman,
> Would you please provide the code about `*TaxiFareStream*`? It seems we
> could use `MapBundleOperator` directly here.
> BTW, I have some concerns about using the solution to do local-aggregation
> for window aggregation because `MapBundleOperator`
> would save input data in a bundle which is a HashMap object which could
> not keep the data input sequence. I'm afraid there exists
> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
> reasonable to assign a watermark based on an unordered
> timestamp.
>
> Best,
> JING ZHANG
>
>
>
> suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>
>> I am trying to do pre shuffle aggregation in flink. Following is the
>> MapBundle implementation.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>   if (value == null) {            return input;        }        value.tip =
>> value.tip + input.tip;        return value;    }    @Override    public
>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>> }}*
>>
>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
>> not working as the "*count*" variable is always 0. Please let me know If
>> I am missing something.
>>
>>
>>
>>
>>
>>
>>
>>
>> *    @Override    public void onElement(T element) throws Exception {
>>     count++;        if (count >= maxCount) {
>> callback.finishBundle();            reset();        }    }*
>>
>> Here is the main code.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>> mapBundleFunction = new TaxiFareMapBundleFunction();
>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>> getKey(TaxiFare value) throws Exception {                return
>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>> fare) -> fare.driverId)//
>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>> AddTips());;                fares.transform("preshuffle",
>> TypeInformation.of(TaxiFare.class),                        new
>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>             ))                        .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>                       @Override                            public long
>> extractTimestamp(TaxiFare element) {                                return
>> element.startTime.getEpochSecond();                            }
>>             })                        .keyBy((TaxiFare fare) ->
>> fare.driverId)
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>> hourlyMax =
>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>
>> Thanks
>> Suman
>>
>

Re: Pre shuffle aggregation in flink is not working

Posted by JING ZHANG <be...@gmail.com>.
Hi Suman,
Would you please provide the code about `*TaxiFareStream*`? It seems we
could use `MapBundleOperator` directly here.
BTW, I have some concerns about using the solution to do local-aggregation
for window aggregation because `MapBundleOperator`
would save input data in a bundle which is a HashMap object which could not
keep the data input sequence. I'm afraid there exists
unorder in a bundle (in your case 10) problem. I'm not sure whether it is
reasonable to assign a watermark based on an unordered
timestamp.

Best,
JING ZHANG



suman shil <cn...@gmail.com> 于2021年8月19日周四 下午12:43写道:

> I am trying to do pre shuffle aggregation in flink. Following is the
> MapBundle implementation.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>   if (value == null) {            return input;        }        value.tip =
> value.tip + input.tip;        return value;    }    @Override    public
> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
> buffer.entrySet()) {            out.collect(entry.getValue());        }
> }}*
>
> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
> not working as the "*count*" variable is always 0. Please let me know If
> I am missing something.
>
>
>
>
>
>
>
>
> *    @Override    public void onElement(T element) throws Exception {
>   count++;        if (count >= maxCount) {
> callback.finishBundle();            reset();        }    }*
>
> Here is the main code.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
> mapBundleFunction = new TaxiFareMapBundleFunction();
> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
> KeySelector<TaxiFare, Long>() {            @Override            public Long
> getKey(TaxiFare value) throws Exception {                return
> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
> fare) -> fare.driverId)//
> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
> AddTips());;                fares.transform("preshuffle",
> TypeInformation.of(TaxiFare.class),                        new
> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>             ))                        .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>                       @Override                            public long
> extractTimestamp(TaxiFare element) {                                return
> element.startTime.getEpochSecond();                            }
>             })                        .keyBy((TaxiFare fare) ->
> fare.driverId)
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
> hourlyMax =
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>
> Thanks
> Suman
>