You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/08/30 15:54:29 UTC

[GitHub] flink pull request #4626: [FLINK-7561][streaming] Implement PreAggregationOp...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4626

    [FLINK-7561][streaming] Implement PreAggregationOperator

    ## What is the purpose of the change
    
    To improve performance in certain situations this PR adds basic implementation of pre-aggregation operator for DataStream API.
    
    ## Verifying this change
    
    This change added `PreAggregationOperatorTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency):  no 
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented): extensive JavaDocs.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink pre-aggregate

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4626.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4626
    
----
commit a87e014edba94bb968467c08872cd074e1a9051d
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-08-30T13:06:10Z

    [FLINK-7561][streaming] Implement PreAggregationOperator
    
    Add first basic implementation of pre-aggregation for streaming API.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    @mproch you might be interested in this one :)


---

[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I think it would be nice to have a utility here in order to make this easier to use:
    ```java
    DataStream result =
        Utils.withPreaggregation(
            stream.timeWindow(Time.minutes(5)), 
            myAggregateFunction
        )
        .apply(windowFunction);
    ```
    
    The utility would basically take the aggregate function and insert the stream transformation for the pre-aggregation on the "*predecessor* or the keyed stream, and then set up the `WindowedStream` again.
    
    Pseudo code:
    ```java
    public static <T, K, W extends Window, A> WindowedStream<T, K, W> preaggregate(
            WindowedStream<T, K, W> windowedStream,
            AggregateFunction<T, A, T> preAggregator) {
    
       // sanity check that the windowedStream has no custom trigger and evictor
    
       PreAggregationOperator preAggOp = new PreAggregationOperator(preAggregator, properties from windowed stream);
    
        DataStream<T> originalStream = 'get predecessor before keyBy from windowed stream'
        DataStream<T> preAggregated = originalStream.transform(preAggOp , ...);
    
        WindowedStream<T, K, W> windowedAgain = preAggregated
            .keyBy(key extractor from original windowed stream)
            .window(assigner);
    
        return windowedAgain;
    }
    ```


---

[GitHub] flink pull request #4626: [FLINK-7561][streaming] Implement PreAggregationOp...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4626#discussion_r136332646
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/PreAggregationOperator.java ---
    @@ -0,0 +1,249 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.aggregation;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.AggregateFunction;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.runtime.state.StateInitializationContext;
    +import org.apache.flink.runtime.state.StateSnapshotContext;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This operator perform preliminary aggregation of the input values on non-keyed stream. This means that the output
    + * is not fully aggregated, but only partially. It should be placed before keyBy of the final aggregation. This make it
    + * useful in couple of scenarios:
    + * <ol>
    + * 		<li>Performing keyBy operation with low number of distinct values in the key. In such case
    + * 		{@link PreAggregationOperator} can reduce both CPU usage and network usage, by pre-aggregating most of the
    + * 		values before shuffling them over the network.</li>
    + *  	<li>Increasing the parallelism above the number of distinct values for the task preceding the keyBy operation.</li>
    + *		<li>Handling the data skew of some of the key values. Normally if there is a data skew, a lot of work could be
    + *  	dumped onto one single CPU core in the cluster. With pre aggregation some of that work can be performed in more
    + *  	distributed fashion by the {@link PreAggregationOperator}.</li>
    + *  	<li>Output partitioning of the data source is correlated with keyBy partitioning. For example when data source
    + *  	is partitioned by day and keyBy function shuffles the data based by day and hour.</li>
    + * </ol>
    + *
    + * <p>Because this operator performs only pre aggregation, it doesn't output the result of {@link AggregateFunction}
    + * but rather it outputs a tuple containing the Key, Window, and Accumulator, where Accumulator is a partially
    + * aggregated result {@link AggregateFunction}.
    + *
    + * <p>Keep in mind that {@link PreAggregationOperator} can have significant higher memory consumption compared to
    + * normal aggregation. If the input data are either not partitioned or the input partitioning is not correlated with
    + * the {@code keySelector}, each instance {@link PreAggregationOperator} can end up having each own accumulators entry
    + * per each key. In other words in that case memory consumption is expected to be {@code parallelism} times larger
    + * compared to what {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} would have.
    + *
    + * <p>It is expected that this operator should be followed by keyBy operation based on {@code tuple.f0} and after that
    + * followed by {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} which perform
    + * {@link AggregateFunction#merge(ACC, ACC)}.
    + *
    + * <p>Because currently {@link PreAggregationOperator} does not use {@link org.apache.flink.streaming.api.TimerService}
    + * only two elements triggering policies are supported:
    + * <ol>
    + *     <li>Flush everything on any watermark.</li>
    + *     <li>Iterate over each element in the state on each watermark and emit it if watermark's timestamp exceeds
    + *     {@link Window#maxTimestamp()}.</li>
    + * </ol>
    + * The first option has a drawback that it will often unnecessary emit elements, that could potentially be further
    + * aggregated. The second one is quite CPU intensive if watermarks are emitted relatively often to the number of pre
    + * aggregated key values.
    + *
    + * <p>Other limitations and notes:
    + * <ol>
    + *     <li>{@link MergingWindowAssigner} is not supported.</li>
    + *     <li>{@link PreAggregationOperator} emits all of its data on each received watermark.</li>
    + *     <li>On restoring from checkpoint keys can be randomly shuffled between {@link PreAggregationOperator}
    + *     instances</li>.
    + * </ol>
    + */
    +@PublicEvolving
    +public class PreAggregationOperator<K, IN, ACC, W extends Window>
    +	extends AbstractStreamOperator<Tuple3<K, W, ACC>>
    +	implements OneInputStreamOperator<IN, Tuple3<K, W, ACC>>, Serializable {
    +
    +	protected final AggregateFunction<IN, ACC, ?> aggregateFunction;
    +	protected final KeySelector<IN, K> keySelector;
    +	protected final WindowAssigner<? super IN, W> windowAssigner;
    +	protected final TypeInformation<K> keyTypeInformation;
    +	protected final TypeInformation<ACC> accumulatorTypeInformation;
    +	protected final boolean flushAllOnWatermark;
    +	protected final Map<Tuple2<K, W>, ACC> aggregates = new HashMap<>();
    +
    +	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    +	protected transient ListState<Tuple3<K, W, ACC>> aggregatesState;
    +
    +	/**
    +	 * Creates {@link PreAggregationOperator}.
    +	 *
    +	 * @param aggregateFunction function used for aggregation. Note, {@link AggregateFunction#getResult(Object)} will
    +	 *                          not be used.
    +	 * @param keySelector
    +	 * @param keyTypeInformation
    +	 * @param accumulatorTypeInformation
    +	 * @param windowAssigner
    +	 * @param flushAllOnWatermark flag to control whether all elements should be emitted on any watermark. Check more
    +	 *                            information in {@link PreAggregationOperator}.
    +	 */
    +	public PreAggregationOperator(
    +			AggregateFunction<IN, ACC, ?> aggregateFunction,
    +			KeySelector<IN, K> keySelector,
    +			TypeInformation<K> keyTypeInformation,
    +			TypeInformation<ACC> accumulatorTypeInformation,
    +			WindowAssigner<? super IN, W> windowAssigner,
    +			boolean flushAllOnWatermark) {
    +		this.aggregateFunction = checkNotNull(aggregateFunction, "aggregateFunction is null");
    +		this.keySelector = checkNotNull(keySelector, "keySelector is null");
    +		this.windowAssigner = checkNotNull(windowAssigner, "windowAssigner is null");
    +		this.keyTypeInformation = checkNotNull(keyTypeInformation, "keyTypeInformation is null");
    +		this.accumulatorTypeInformation = checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null");
    +		this.flushAllOnWatermark = flushAllOnWatermark;
    +
    +		checkNotNull(keyTypeInformation, "keyTypeInformation is null");
    +		checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null");
    +
    +		checkArgument(!(windowAssigner instanceof MergingWindowAssigner),
    +			"MergingWindowAssigner is not supported by the PreAggregationOperator");
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
    +			@Override
    +			public long getCurrentProcessingTime() {
    +				return System.currentTimeMillis();
    --- End diff --
    
    Changed to throwing `UnsupportedException`, since processing time doesn't make sense combined with pre aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4626: [FLINK-7561][streaming] Implement PreAggregationOp...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4626#discussion_r136293381
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/PreAggregationOperator.java ---
    @@ -0,0 +1,249 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.aggregation;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.AggregateFunction;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.runtime.state.StateInitializationContext;
    +import org.apache.flink.runtime.state.StateSnapshotContext;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This operator perform preliminary aggregation of the input values on non-keyed stream. This means that the output
    + * is not fully aggregated, but only partially. It should be placed before keyBy of the final aggregation. This make it
    + * useful in couple of scenarios:
    + * <ol>
    + * 		<li>Performing keyBy operation with low number of distinct values in the key. In such case
    + * 		{@link PreAggregationOperator} can reduce both CPU usage and network usage, by pre-aggregating most of the
    + * 		values before shuffling them over the network.</li>
    + *  	<li>Increasing the parallelism above the number of distinct values for the task preceding the keyBy operation.</li>
    + *		<li>Handling the data skew of some of the key values. Normally if there is a data skew, a lot of work could be
    + *  	dumped onto one single CPU core in the cluster. With pre aggregation some of that work can be performed in more
    + *  	distributed fashion by the {@link PreAggregationOperator}.</li>
    + *  	<li>Output partitioning of the data source is correlated with keyBy partitioning. For example when data source
    + *  	is partitioned by day and keyBy function shuffles the data based by day and hour.</li>
    + * </ol>
    + *
    + * <p>Because this operator performs only pre aggregation, it doesn't output the result of {@link AggregateFunction}
    + * but rather it outputs a tuple containing the Key, Window, and Accumulator, where Accumulator is a partially
    + * aggregated result {@link AggregateFunction}.
    + *
    + * <p>Keep in mind that {@link PreAggregationOperator} can have significant higher memory consumption compared to
    + * normal aggregation. If the input data are either not partitioned or the input partitioning is not correlated with
    + * the {@code keySelector}, each instance {@link PreAggregationOperator} can end up having each own accumulators entry
    + * per each key. In other words in that case memory consumption is expected to be {@code parallelism} times larger
    + * compared to what {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} would have.
    + *
    + * <p>It is expected that this operator should be followed by keyBy operation based on {@code tuple.f0} and after that
    + * followed by {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} which perform
    + * {@link AggregateFunction#merge(ACC, ACC)}.
    + *
    + * <p>Because currently {@link PreAggregationOperator} does not use {@link org.apache.flink.streaming.api.TimerService}
    + * only two elements triggering policies are supported:
    + * <ol>
    + *     <li>Flush everything on any watermark.</li>
    + *     <li>Iterate over each element in the state on each watermark and emit it if watermark's timestamp exceeds
    + *     {@link Window#maxTimestamp()}.</li>
    + * </ol>
    + * The first option has a drawback that it will often unnecessary emit elements, that could potentially be further
    + * aggregated. The second one is quite CPU intensive if watermarks are emitted relatively often to the number of pre
    + * aggregated key values.
    + *
    + * <p>Other limitations and notes:
    + * <ol>
    + *     <li>{@link MergingWindowAssigner} is not supported.</li>
    + *     <li>{@link PreAggregationOperator} emits all of its data on each received watermark.</li>
    + *     <li>On restoring from checkpoint keys can be randomly shuffled between {@link PreAggregationOperator}
    + *     instances</li>.
    + * </ol>
    + */
    +@PublicEvolving
    +public class PreAggregationOperator<K, IN, ACC, W extends Window>
    +	extends AbstractStreamOperator<Tuple3<K, W, ACC>>
    +	implements OneInputStreamOperator<IN, Tuple3<K, W, ACC>>, Serializable {
    +
    +	protected final AggregateFunction<IN, ACC, ?> aggregateFunction;
    +	protected final KeySelector<IN, K> keySelector;
    +	protected final WindowAssigner<? super IN, W> windowAssigner;
    +	protected final TypeInformation<K> keyTypeInformation;
    +	protected final TypeInformation<ACC> accumulatorTypeInformation;
    +	protected final boolean flushAllOnWatermark;
    +	protected final Map<Tuple2<K, W>, ACC> aggregates = new HashMap<>();
    +
    +	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    +	protected transient ListState<Tuple3<K, W, ACC>> aggregatesState;
    +
    +	/**
    +	 * Creates {@link PreAggregationOperator}.
    +	 *
    +	 * @param aggregateFunction function used for aggregation. Note, {@link AggregateFunction#getResult(Object)} will
    +	 *                          not be used.
    +	 * @param keySelector
    +	 * @param keyTypeInformation
    +	 * @param accumulatorTypeInformation
    +	 * @param windowAssigner
    +	 * @param flushAllOnWatermark flag to control whether all elements should be emitted on any watermark. Check more
    +	 *                            information in {@link PreAggregationOperator}.
    +	 */
    +	public PreAggregationOperator(
    +			AggregateFunction<IN, ACC, ?> aggregateFunction,
    +			KeySelector<IN, K> keySelector,
    +			TypeInformation<K> keyTypeInformation,
    +			TypeInformation<ACC> accumulatorTypeInformation,
    +			WindowAssigner<? super IN, W> windowAssigner,
    +			boolean flushAllOnWatermark) {
    +		this.aggregateFunction = checkNotNull(aggregateFunction, "aggregateFunction is null");
    +		this.keySelector = checkNotNull(keySelector, "keySelector is null");
    +		this.windowAssigner = checkNotNull(windowAssigner, "windowAssigner is null");
    +		this.keyTypeInformation = checkNotNull(keyTypeInformation, "keyTypeInformation is null");
    +		this.accumulatorTypeInformation = checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null");
    +		this.flushAllOnWatermark = flushAllOnWatermark;
    +
    +		checkNotNull(keyTypeInformation, "keyTypeInformation is null");
    +		checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null");
    +
    +		checkArgument(!(windowAssigner instanceof MergingWindowAssigner),
    +			"MergingWindowAssigner is not supported by the PreAggregationOperator");
    +	}
    +
    +	@Override
    +	public void open() throws Exception {
    +		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
    +			@Override
    +			public long getCurrentProcessingTime() {
    +				return System.currentTimeMillis();
    --- End diff --
    
    This should use the `ProcessingTimeService` that can be retrieve via `getProcessingTimeService()`. Then you can also test the operator with processing time because the test harness allows advancing processing time manually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I have benchmarked this code and here are the results of using this operator for tumbling windows, with 1000 number of distinct values in the key and watermark once every 16000 elements:
    
    Benchmark | Score | Error | Units
    ---------- | ----- | ----- | -----
    WindowBenchmarks.preAggregateTumblingWindow | 4136.634 | ± 72.370 | ops/ms
    WindowBenchmarks.tumblingWindow | 2592.292 | ± 67.317 | ops/ms
    
    (benchmark code: https://github.com/dataArtisans/flink-benchmarks/pull/2 )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I moved everything to `flink-streaming-contrib`.
    
    @StephanEwen I went one step further and created following helper function:
    ```
    	public static <K, IN, ACC, OUT, W extends Window> SingleOutputStreamOperator<OUT> aggregateWithPreAggregation(
    			DataStream<IN> input,
    			KeySelector<IN, K> keySelector,
    			AggregateFunction<IN, ACC, OUT> aggregateFunction,
    			WindowAssigner<? super IN, W> windowAssigner)
    ```
    
    It adds a final aggregation step as well. With version that you proposed user would have to implement two slightly different `windowAssigner` and `aggregationFunction` for both the pre aggregation and final aggregation step. This could lead to a confusion and mistakes. Those functions have to be different because assigning windows, creating accumulators and accumulating happens only in the pre aggregation step and it works on `INPUT` data type, while in final aggregation we are working on a `Tuple3<KEY, WINDOW, ACCUMULATOR>` and we perform only merging of already created windows and accumulators. 


---