You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:20 UTC
[15/34] incubator-flink git commit: [streaming] New windowing API
merge and cleanup + several minor fixes
[streaming] New windowing API merge and cleanup + several minor fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f96ba06e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f96ba06e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f96ba06e
Branch: refs/heads/master
Commit: f96ba06e0510690af8c44ae5ef09dda15d2e1ff4
Parents: 751f101
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Nov 23 18:54:44 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100
----------------------------------------------------------------------
.../api/datastream/BatchedDataStream.java | 414 -------------------
.../streaming/api/datastream/DataStream.java | 96 -----
.../api/datastream/WindowDataStream.java | 92 -----
.../api/datastream/WindowedDataStream.java | 403 ++++++++++++++++--
.../operator/BatchGroupReduceInvokable.java | 163 --------
.../operator/BatchReduceInvokable.java | 224 ----------
.../GroupedBatchGroupReduceInvokable.java | 61 ---
.../operator/GroupedBatchReduceInvokable.java | 58 ---
.../GroupedWindowGroupReduceInvokable.java | 128 ------
.../operator/GroupedWindowReduceInvokable.java | 106 -----
.../operator/GroupedWindowingInvokable.java | 41 +-
.../operator/WindowGroupReduceInvokable.java | 108 -----
.../operator/WindowReduceInvokable.java | 124 ------
.../operator/WindowingGroupInvokable.java | 43 ++
.../invokable/operator/WindowingInvokable.java | 44 +-
.../operator/WindowingReduceInvokable.java | 59 +++
.../streaming/api/windowing/helper/Count.java | 25 +-
.../streaming/api/windowing/helper/Time.java | 79 ++--
.../windowing/policy/CountTriggerPolicy.java | 2 +-
.../windowing/policy/TimeEvictionPolicy.java | 4 +-
.../api/windowing/policy/TimeTriggerPolicy.java | 55 +--
.../apache/flink/streaming/api/PrintTest.java | 60 ++-
.../operator/BatchGroupReduceTest.java | 98 -----
.../api/invokable/operator/BatchReduceTest.java | 85 ----
.../operator/GroupedBatchGroupReduceTest.java | 101 -----
.../operator/GroupedBatchReduceTest.java | 107 -----
.../GroupedWindowGroupReduceInvokableTest.java | 99 -----
.../operator/GroupedWindowingInvokableTest.java | 44 +-
.../WindowGroupReduceInvokableTest.java | 124 ------
.../operator/WindowReduceInvokableTest.java | 130 ------
.../operator/WindowingInvokableTest.java | 42 +-
.../windowing/policy/TimeTriggerPolicyTest.java | 39 +-
.../ml/IncrementalLearningSkeleton.java | 6 +-
.../examples/windowing/BasicExample.java | 76 ----
.../windowing/MultiplePoliciesExample.java | 38 +-
.../examples/windowing/SlidingExample.java | 27 +-
.../windowing/TimeWindowingExample.java | 57 +--
37 files changed, 771 insertions(+), 2691 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
deleted file mode 100755
index c8a49c6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-
-/**
- * A {@link BatchedDataStream} represents a data stream whose elements are
- * batched together in a sliding batch. operations like
- * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
- * are applied for each batch and the batch is slid afterwards.
- *
- * @param <OUT>
- * The output type of the {@link BatchedDataStream}
- */
-public class BatchedDataStream<OUT> {
-
- protected DataStream<OUT> dataStream;
- protected boolean isGrouped;
- protected KeySelector<OUT, ?> keySelector;
- protected long batchSize;
- protected long slideSize;
-
- protected BatchedDataStream(DataStream<OUT> dataStream, long batchSize, long slideSize) {
- if (dataStream instanceof GroupedDataStream) {
- this.isGrouped = true;
- this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
- } else {
- this.isGrouped = false;
- }
- this.dataStream = dataStream.copy();
- this.batchSize = batchSize;
- this.slideSize = slideSize;
- }
-
- protected BatchedDataStream(BatchedDataStream<OUT> batchedDataStream) {
- this.dataStream = batchedDataStream.dataStream.copy();
- this.isGrouped = batchedDataStream.isGrouped;
- this.keySelector = batchedDataStream.keySelector;
- this.batchSize = batchedDataStream.batchSize;
- this.slideSize = batchedDataStream.slideSize;
- }
-
- /**
- * Groups the elements of the {@link BatchedDataStream} by the given key
- * positions to be used with grouped operators.
- *
- * @param fields
- * The position of the fields on which the
- * {@link BatchedDataStream} will be grouped.
- * @return The transformed {@link BatchedDataStream}
- */
- public BatchedDataStream<OUT> groupBy(int... fields) {
- return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
- }
-
- /**
- * Groups a {@link BatchedDataStream} using field expressions. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link BatchedDataStream}S underlying type. A dot can
- * be used to drill down into objects, as in
- * {@code "field1.getInnerField2()" }.
- *
- * @param fields
- * One or more field expressions on which the DataStream will be
- * grouped.
- * @return The grouped {@link BatchedDataStream}
- **/
- public BatchedDataStream<OUT> groupBy(String... fields) {
-
- return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
-
- }
-
- /**
- * Applies a reduce transformation on every sliding batch/window of the data
- * stream. If the data stream is grouped then the reducer is applied on
- * every group of elements sharing the same key. This type of reduce is much
- * faster than reduceGroup since the reduce function can be applied
- * incrementally. The user can also extend the {@link RichReduceFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface.
- *
- * @param reducer
- * The {@link ReduceFunction} that will be called for every
- * element of the input values in the batch/window.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), getReduceInvokable(reducer));
- }
-
- /**
- * Applies a reduceGroup transformation on preset batches/windows of the
- * DataStream. The transformation calls a {@link GroupReduceFunction} for
- * each batch/window. Each GroupReduceFunction call can return any number of
- * elements including none. The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
- *
- * @param reducer
- * The {@link GroupReduceFunction} that will be called for every
- * batch/window.
- * @return The transformed DataStream.
- */
- public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
- return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
- GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
- GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer));
- }
-
- /**
- * Applies an aggregation that sums every sliding batch/window of the data
- * stream at the given position.
- *
- * @param positionToSum
- * The position in the data point to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
- dataStream.checkFieldRange(positionToSum);
- return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
- dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
- }
-
- /**
- * Syntactic sugar for sum(0)
- *
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> sum() {
- return sum(0);
- }
-
- /**
- * Applies an aggregation that that gives the sum of the pojo data stream at
- * the given field expression. A field expression is either the name of a
- * public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> sum(String field) {
- return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
- getOutputType()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum of every sliding
- * batch/window of the data stream at the given position.
- *
- * @param positionToMin
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
- dataStream.checkFieldRange(positionToMin);
- return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
- AggregationType.MIN));
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every sliding
- * batch/window of the data stream by the given position. If more elements
- * have the same minimum value the operator returns the first element by
- * default.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every sliding
- * batch/window of the data stream by the given position. If more elements
- * have the same minimum value the operator returns either the first or last
- * one depending on the parameter setting.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @param first
- * If true, then the operator return the first element with the
- * minimum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
- dataStream.checkFieldRange(positionToMinBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
- AggregationType.MINBY, first));
- }
-
- /**
- * Syntactic sugar for min(0)
- *
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> min() {
- return min(0);
- }
-
- /**
- * Applies an aggregation that gives the maximum of every sliding
- * batch/window of the data stream at the given position.
- *
- * @param positionToMax
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
- dataStream.checkFieldRange(positionToMax);
- return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
- AggregationType.MAX));
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every sliding
- * batch/window of the data stream by the given position. If more elements
- * have the same maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every sliding
- * batch/window of the data stream by the given position. If more elements
- * have the same maximum value the operator returns either the first or last
- * one depending on the parameter setting.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @param first
- * If true, then the operator return the first element with the
- * maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
- dataStream.checkFieldRange(positionToMaxBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
- AggregationType.MAXBY, first));
- }
-
- /**
- * Syntactic sugar for max(0)
- *
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> max() {
- return max(0);
- }
-
- /**
- * Applies an aggregation that that gives the minimum of the pojo data
- * stream at the given field expression. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> min(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MIN, false));
- }
-
- /**
- * Applies an aggregation that that gives the maximum of the pojo data
- * stream at the given field expression. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> max(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MAX, false));
- }
-
- /**
- * Applies an aggregation that that gives the minimum element of the pojo
- * data stream by the given field expression. A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MINBY, first));
- }
-
- /**
- * Applies an aggregation that that gives the maximum element of the pojo
- * data stream by the given field expression. A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MAXBY, first));
- }
-
- /**
- * Gets the output type.
- *
- * @return The output type.
- */
- public TypeInformation<OUT> getOutputType() {
- return dataStream.getOutputType();
- }
-
- private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
- StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
-
- SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
- aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
-
- return returnStream;
- }
-
- protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
- BatchReduceInvokable<OUT> invokable;
- if (isGrouped) {
- invokable = new GroupedBatchReduceInvokable<OUT>(reducer, batchSize, slideSize,
- keySelector);
- } else {
- invokable = new BatchReduceInvokable<OUT>(reducer, batchSize, slideSize);
- }
- return invokable;
- }
-
- protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
- GroupReduceFunction<OUT, R> reducer) {
- BatchGroupReduceInvokable<OUT, R> invokable;
- if (isGrouped) {
- invokable = new GroupedBatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
- keySelector);
- } else {
- invokable = new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize);
- }
- return invokable;
- }
-
- protected BatchedDataStream<OUT> copy() {
- return new BatchedDataStream<OUT>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c38a638..2ce28d9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -663,7 +663,6 @@ public class DataStream<OUT> {
return new GroupedDataStream<OUT>(this, keySelector);
}
-
/**
* This allows you to set up windowing through a nice API using
* {@link WindowingHelper} such as {@link Time}, {@link Count} and
@@ -678,101 +677,6 @@ public class DataStream<OUT> {
public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
return new WindowedDataStream<OUT>(this, policyHelpers);
}
-
- /**
- * Collects the data stream elements into sliding batches creating a new
- * {@link BatchedDataStream}. The user can apply transformations like
- * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
- * or aggregations on the {@link BatchedDataStream}.
- *
- * @param batchSize
- * The number of elements in each batch at each operator
- * @param slideSize
- * The number of elements with which the batches are slid by
- * after each transformation.
- * @return The transformed {@link DataStream}
- */
- public BatchedDataStream<OUT> batch(long batchSize, long slideSize) {
- if (batchSize < 1) {
- throw new IllegalArgumentException("Batch size must be positive");
- }
- if (slideSize < 1) {
- throw new IllegalArgumentException("Slide size must be positive");
- }
- return new BatchedDataStream<OUT>(this, batchSize, slideSize);
- }
-
- /**
- * Collects the data stream elements into sliding batches creating a new
- * {@link BatchedDataStream}. The user can apply transformations like
- * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
- * or aggregations on the {@link BatchedDataStream}.
- *
- * @param batchSize
- * The number of elements in each batch at each operator
- * @return The transformed {@link DataStream}
- */
- public BatchedDataStream<OUT> batch(long batchSize) {
- return batch(batchSize, batchSize);
- }
-
- /**
- * Collects the data stream elements into sliding windows creating a new
- * {@link WindowDataStream}. The user can apply transformations like
- * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
- * aggregations on the {@link WindowDataStream}.
- *
- * @param windowSize
- * The length of the window in milliseconds.
- * @param slideInterval
- * The number of milliseconds with which the windows are slid by
- * after each transformation.
- * @param timestamp
- * User defined function for extracting time-stamps from each
- * element
- * @return The transformed {@link DataStream}
- */
- public WindowDataStream<OUT> window(long windowSize, long slideInterval,
- TimeStamp<OUT> timestamp) {
- if (windowSize < 1) {
- throw new IllegalArgumentException("Window size must be positive");
- }
- if (slideInterval < 1) {
- throw new IllegalArgumentException("Slide interval must be positive");
- }
- return new WindowDataStream<OUT>(this, windowSize, slideInterval, timestamp);
- }
-
- /**
- * Collects the data stream elements into sliding windows creating a new
- * {@link WindowDataStream}. The user can apply transformations like
- * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
- * aggregations on the {@link WindowDataStream}.
- *
- * @param windowSize
- * The length of the window in milliseconds.
- * @param slideInterval
- * The number of milliseconds with which the windows are slid by
- * after each transformation.
- * @return The transformed {@link DataStream}
- */
- public WindowDataStream<OUT> window(long windowSize, long slideInterval) {
- return window(windowSize, slideInterval, new DefaultTimeStamp<OUT>());
- }
-
- /**
- * Collects the data stream elements into sliding windows creating a new
- * {@link WindowDataStream}. The user can apply transformations like
- * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
- * aggregations on the {@link WindowDataStream}.
- *
- * @param windowSize
- * The length of the window in milliseconds.
- * @return The transformed {@link DataStream}
- */
- public WindowDataStream<OUT> window(long windowSize) {
- return window(windowSize, windowSize);
- }
/**
* Applies an aggregation that sums the data stream at the given position.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
deleted file mode 100755
index be0e37f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-/**
- * A {@link WindowDataStream} represents a data stream whose elements are
- * batched together in a sliding window. operations like
- * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
- * are applied for each window and the window is slid afterwards.
- *
- * @param <OUT>
- * The output type of the {@link WindowDataStream}
- */
-public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
-
- TimeStamp<OUT> timeStamp;
-
- protected WindowDataStream(DataStream<OUT> dataStream, long windowSize, long slideInterval,
- TimeStamp<OUT> timeStamp) {
- super(dataStream, windowSize, slideInterval);
- this.timeStamp = timeStamp;
- }
-
- protected WindowDataStream(WindowDataStream<OUT> windowDataStream) {
- super(windowDataStream);
- this.timeStamp = windowDataStream.timeStamp;
- }
-
- public WindowDataStream<OUT> groupBy(int... keyPositions) {
- return new WindowDataStream<OUT>(dataStream.groupBy(keyPositions), batchSize, slideSize,
- timeStamp);
- }
-
- public WindowDataStream<OUT> groupBy(String... fields) {
- return new WindowDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize,
- timeStamp);
- }
-
- protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
- WindowReduceInvokable<OUT> invokable;
- if (isGrouped) {
- invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
- keySelector, timeStamp);
- } else {
- invokable = new WindowReduceInvokable<OUT>(reducer, batchSize, slideSize, timeStamp);
- }
- return invokable;
- }
-
- protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
- GroupReduceFunction<OUT, R> reducer) {
- BatchGroupReduceInvokable<OUT, R> invokable;
- if (isGrouped) {
- invokable = new GroupedWindowGroupReduceInvokable<OUT, R>(reducer, batchSize,
- slideSize, keySelector, timeStamp);
- } else {
- invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
- timeStamp);
- }
- return invokable;
- }
-
- public WindowDataStream<OUT> copy() {
- return new WindowDataStream<OUT>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 2e6b885..dd26e64 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -17,25 +17,34 @@
package org.apache.flink.streaming.api.datastream;
+import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
-import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.operator.WindowingInvokable;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowingGroupInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowingReduceInvokable;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
/**
- * A {@link WindowedDataStream} represents a data stream whose elements
- * are batched together in a sliding batch. operations like
+ * A {@link WindowedDataStream} represents a data stream whose elements are
+ * batched together in a sliding batch. operations like
* {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
* are applied for each batch and the batch is slid afterwards.
*
@@ -48,19 +57,31 @@ public class WindowedDataStream<OUT> {
protected boolean isGrouped;
protected KeySelector<OUT, ?> keySelector;
- protected WindowingHelper<OUT>[] triggerPolicies;
- protected WindowingHelper<OUT>[] evictionPolicies;
+ protected List<WindowingHelper<OUT>> triggerPolicies;
+ protected List<WindowingHelper<OUT>> evictionPolicies;
+
+ protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
+ this.dataStream = dataStream.copy();
+ this.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+ for (WindowingHelper<OUT> helper : policyHelpers) {
+ this.triggerPolicies.add(helper);
+ }
- protected WindowedDataStream(DataStream<OUT> dataStream,
- WindowingHelper<OUT>... policyHelpers) {
if (dataStream instanceof GroupedDataStream) {
this.isGrouped = true;
this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+
} else {
this.isGrouped = false;
}
- this.dataStream = dataStream.copy();
- this.triggerPolicies = policyHelpers;
+ }
+
+ protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+ this.dataStream = windowedDataStream.dataStream.copy();
+ this.isGrouped = windowedDataStream.isGrouped;
+ this.keySelector = windowedDataStream.keySelector;
+ this.triggerPolicies = windowedDataStream.triggerPolicies;
+ this.evictionPolicies = windowedDataStream.evictionPolicies;
}
protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
@@ -87,18 +108,43 @@ public class WindowedDataStream<OUT> {
return evictionPolicyList;
}
- protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
- this.dataStream = windowedDataStream.dataStream.copy();
- this.isGrouped = windowedDataStream.isGrouped;
- this.keySelector = windowedDataStream.keySelector;
- this.triggerPolicies = windowedDataStream.triggerPolicies;
- this.evictionPolicies = windowedDataStream.evictionPolicies;
+ protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
+ LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
+ // Add Time triggers to central triggers
+ for (TriggerPolicy<OUT> trigger : getTriggers()) {
+ if (trigger instanceof TimeTriggerPolicy) {
+ cTriggers.add(trigger);
+ }
+ }
+ return cTriggers;
+ }
+
+ protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
+ LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
+
+ // Everything except Time triggers are distributed
+ for (TriggerPolicy<OUT> trigger : getTriggers()) {
+ if (!(trigger instanceof TimeTriggerPolicy)) {
+ dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+ }
+ }
+
+ return dTriggers;
+ }
+
+ protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
+ LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+ for (EvictionPolicy<OUT> evicter : getEvicters()) {
+ evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+ }
+
+ return evicters;
}
/**
* Groups the elements of the {@link WindowedDataStream} by the given
- * key position to be used with grouped operators.
+ * {@link KeySelector} to be used with grouped operators.
*
* @param keySelector
* The specification of the key on which the
@@ -109,7 +155,39 @@ public class WindowedDataStream<OUT> {
WindowedDataStream<OUT> ret = this.copy();
ret.dataStream = ret.dataStream.groupBy(keySelector);
ret.isGrouped = true;
- ret.keySelector = keySelector;
+ ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+ return ret;
+ }
+
+ /**
+ * Groups the elements of the {@link WindowedDataStream} by the given key
+ * positions to be used with grouped operators.
+ *
+ * @param fields
+ * The position of the fields to group by.
+ * @return The transformed {@link WindowedDataStream}
+ */
+ public WindowedDataStream<OUT> groupBy(int... fields) {
+ WindowedDataStream<OUT> ret = this.copy();
+ ret.dataStream = ret.dataStream.groupBy(fields);
+ ret.isGrouped = true;
+ ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+ return ret;
+ }
+
+ /**
+ * Groups the elements of the {@link WindowedDataStream} by the given field
+ * expressions to be used with grouped operators.
+ *
+ * @param fields
+ * The position of the fields to group by.
+ * @return The transformed {@link WindowedDataStream}
+ */
+ public WindowedDataStream<OUT> groupBy(String... fields) {
+ WindowedDataStream<OUT> ret = this.copy();
+ ret.dataStream = ret.dataStream.groupBy(fields);
+ ret.isGrouped = true;
+ ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
return ret;
}
@@ -126,13 +204,242 @@ public class WindowedDataStream<OUT> {
* information
* @return The single output operator
*/
- public SingleOutputStreamOperator<Tuple2<OUT, String[]>, ?> reduce(
- ReduceFunction<OUT> reduceFunction) {
+ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
- new FunctionTypeWrapper<OUT>(reduceFunction, ReduceFunction.class, 0),
- new CombineTypeWrapper<OUT, String[]>(dataStream.outTypeWrapper,
- new ObjectTypeWrapper<String[]>(new String[] { "" })),
- new WindowingInvokable<OUT>(reduceFunction, getTriggers(), getEvicters()));
+ dataStream.outTypeWrapper, dataStream.outTypeWrapper,
+ getReduceInvokable(reduceFunction));
+ }
+
+ public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+ GroupReduceFunction<OUT, R> reduceFunction) {
+ return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
+ dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
+ GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
+ }
+
+ /**
+ * Applies an aggregation that sums every sliding batch/window of the data
+ * stream at the given position.
+ *
+ * @param positionToSum
+ * The position in the data point to sum
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+ dataStream.checkFieldRange(positionToSum);
+ return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
+ dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+ }
+
+ /**
+ * Syntactic sugar for sum(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum() {
+ return sum(0);
+ }
+
+ /**
+ * Applies an aggregation that that gives the sum of the pojo data stream at
+ * the given field expression. A field expression is either the name of a
+ * public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+ return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
+ getOutputType()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum of every sliding
+ * batch/window of the data stream at the given position.
+ *
+ * @param positionToMin
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+ dataStream.checkFieldRange(positionToMin);
+ return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+ AggregationType.MIN));
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every sliding
+ * batch/window of the data stream by the given position. If more elements
+ * have the same minimum value the operator returns the first element by
+ * default.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every sliding
+ * batch/window of the data stream by the given position. If more elements
+ * have the same minimum value the operator returns either the first or last
+ * one depending on the parameter setting.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @param first
+ * If true, then the operator return the first element with the
+ * minimum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
+ dataStream.checkFieldRange(positionToMinBy);
+ return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+ AggregationType.MINBY, first));
+ }
+
+ /**
+ * Syntactic sugar for min(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min() {
+ return min(0);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum of every sliding
+ * batch/window of the data stream at the given position.
+ *
+ * @param positionToMax
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+ dataStream.checkFieldRange(positionToMax);
+ return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+ AggregationType.MAX));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every sliding
+ * batch/window of the data stream by the given position. If more elements
+ * have the same maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every sliding
+ * batch/window of the data stream by the given position. If more elements
+ * have the same maximum value the operator returns either the first or last
+ * one depending on the parameter setting.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @param first
+ * If true, then the operator return the first element with the
+ * maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
+ dataStream.checkFieldRange(positionToMaxBy);
+ return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+ AggregationType.MAXBY, first));
+ }
+
+ /**
+ * Syntactic sugar for max(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max() {
+ return max(0);
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum of the pojo data
+ * stream at the given field expression. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min(String field) {
+ return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ AggregationType.MIN, false));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum of the pojo data
+ * stream at the given field expression. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max(String field) {
+ return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ AggregationType.MAX, false));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the pojo
+ * data stream by the given field expression. A field expression is either
+ * the name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+ return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ AggregationType.MINBY, first));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the pojo
+ * data stream by the given field expression. A field expression is either
+ * the name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
+ return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ AggregationType.MAXBY, first));
}
/**
@@ -153,11 +460,45 @@ public class WindowedDataStream<OUT> {
WindowedDataStream<OUT> ret = this.copy();
if (ret.evictionPolicies == null) {
ret.evictionPolicies = ret.triggerPolicies;
- ret.triggerPolicies = policyHelpers;
- } else {
- ret.triggerPolicies = (WindowingHelper<OUT>[]) ArrayUtils.addAll(triggerPolicies,
- policyHelpers);
+ ret.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+ }
+ for (WindowingHelper<OUT> helper : policyHelpers) {
+ ret.triggerPolicies.add(helper);
}
return ret;
}
+
+ private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
+ StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
+
+ SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
+ aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+
+ return returnStream;
+ }
+
+ protected <R> StreamInvokable<OUT, R> getReduceGroupInvokable(
+ GroupReduceFunction<OUT, R> reducer) {
+ StreamInvokable<OUT, R> invokable;
+ if (isGrouped) {
+ invokable = new GroupedWindowingInvokable<OUT, R>(reducer, keySelector,
+ getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+
+ } else {
+ invokable = new WindowingGroupInvokable<OUT, R>(reducer, getTriggers(), getEvicters());
+ }
+ return invokable;
+ }
+
+ protected StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+ StreamInvokable<OUT, OUT> invokable;
+ if (isGrouped) {
+ invokable = new GroupedWindowingInvokable<OUT, OUT>(reducer, keySelector,
+ getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+
+ } else {
+ invokable = new WindowingReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
+ }
+ return invokable;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
deleted file mode 100755
index 134c33d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.io.Serializable;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.CircularFifoList;
-
-public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
- protected GroupReduceFunction<IN, OUT> reducer;
-
- protected long slideSize;
-
- protected long batchSize;
- protected int granularity;
- protected int batchPerSlide;
- protected StreamBatch batch;
- protected StreamBatch currentBatch;
- protected long numberOfBatches;
-
- public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
- long slideSize) {
- super(reduceFunction);
- this.reducer = reduceFunction;
- this.batchSize = batchSize;
- this.slideSize = slideSize;
- this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
- this.batchPerSlide = (int) (slideSize / granularity);
- this.numberOfBatches = batchSize / granularity;
- this.batch = new StreamBatch();
- }
-
- @Override
- protected void immutableInvoke() throws Exception {
- if ((reuse = recordIterator.next(reuse)) == null) {
- throw new RuntimeException("DataStream must not be empty");
- }
-
- while (reuse != null) {
- StreamBatch batch = getBatch(reuse);
- batch.addToBuffer(reuse.getObject());
-
- resetReuse();
- reuse = recordIterator.next(reuse);
- }
-
- reduceLastBatch();
- }
-
- @Override
- // TODO: implement mutableInvoke for reduce
- protected void mutableInvoke() throws Exception {
- System.out.println("Immutable setting is used");
- immutableInvoke();
- }
-
- protected StreamBatch getBatch(StreamRecord<IN> next) {
- return batch;
- }
-
- protected void reduce(StreamBatch batch) {
- this.currentBatch = batch;
- callUserFunctionAndLogException();
- }
-
- protected void reduceLastBatch() {
- batch.reduceLastBatch();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- if(!currentBatch.circularList.isEmpty()){
- reducer.reduce(currentBatch.circularList.getIterable(), collector);
- }
- }
-
- protected class StreamBatch implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private long counter;
- protected long minibatchCounter;
-
- protected CircularFifoList<IN> circularList;
-
- public StreamBatch() {
- this.circularList = new CircularFifoList<IN>();
- this.counter = 0;
- this.minibatchCounter = 0;
- }
-
- public void addToBuffer(IN nextValue) throws Exception {
- circularList.add(nextValue);
-
- counter++;
-
- if (miniBatchEnd()) {
- circularList.newSlide();
- minibatchCounter++;
- if (batchEnd()) {
- reduceBatch();
- circularList.shiftWindow(batchPerSlide);
- }
- }
-
- }
-
- protected boolean miniBatchEnd() {
- if( (counter % granularity) == 0){
- counter = 0;
- return true;
- }else{
- return false;
- }
- }
-
-
- public boolean batchEnd() {
- if (minibatchCounter == numberOfBatches) {
- minibatchCounter -= batchPerSlide;
- return true;
- }
- return false;
- }
-
- public void reduceBatch() {
- reduce(this);
- }
-
- public void reduceLastBatch() {
- if (!miniBatchEnd()) {
- reduceBatch();
- }
- }
-
- @Override
- public String toString(){
- return circularList.toString();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
deleted file mode 100755
index 6b22555..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.NullableCircularBuffer;
-
-public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
-
- private static final long serialVersionUID = 1L;
- protected ReduceFunction<OUT> reducer;
-
- protected long slideSize;
-
- protected long batchSize;
- protected int granularity;
- protected long batchPerSlide;
- protected long numberOfBatches;
- protected StreamBatch batch;
- protected StreamBatch currentBatch;
-
- protected TypeSerializer<OUT> serializer;
-
- public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long slideSize) {
- super(reduceFunction);
- this.reducer = reduceFunction;
- this.batchSize = batchSize;
- this.slideSize = slideSize;
- this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
- this.batchPerSlide = slideSize / granularity;
- this.numberOfBatches = batchSize / granularity;
- }
-
- @Override
- protected void immutableInvoke() throws Exception {
- if ((reuse = recordIterator.next(reuse)) == null) {
- throw new RuntimeException("DataStream must not be empty");
- }
-
- while (reuse != null) {
- StreamBatch batch = getBatch(reuse);
-
- batch.reduceToBuffer(reuse.getObject());
-
- resetReuse();
- reuse = recordIterator.next(reuse);
- }
-
- reduceLastBatch();
-
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- System.out.println("Immutable setting is used");
- immutableInvoke();
- }
-
- protected StreamBatch getBatch(StreamRecord<OUT> next) {
- return batch;
- }
-
- protected void reduce(StreamBatch batch) {
- this.currentBatch = batch;
- callUserFunctionAndLogException();
- }
-
- protected void reduceLastBatch() throws Exception {
- batch.reduceLastBatch();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- Iterator<OUT> reducedIterator = currentBatch.getIterator();
- OUT reduced = null;
-
- while (reducedIterator.hasNext() && reduced == null) {
- reduced = reducedIterator.next();
- }
-
- while (reducedIterator.hasNext()) {
- OUT next = reducedIterator.next();
- if (next != null) {
- reduced = reducer.reduce(serializer.copy(reduced), serializer.copy(next));
- }
- }
- if (reduced != null) {
- collector.collect(reduced);
- }
- }
-
- protected class StreamBatch implements Serializable {
-
- private static final long serialVersionUID = 1L;
- protected long counter;
- protected long minibatchCounter;
- protected OUT currentValue;
- boolean changed;
-
- protected NullableCircularBuffer circularBuffer;
-
- public StreamBatch() {
-
- this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
- this.counter = 0;
- this.minibatchCounter = 0;
- this.changed = false;
- }
-
- public void reduceToBuffer(OUT nextValue) throws Exception {
-
- if (currentValue != null) {
- currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
- } else {
- currentValue = nextValue;
- }
-
- counter++;
-
- if (miniBatchEnd()) {
- addToBuffer();
- if (batchEnd()) {
- reduceBatch();
- }
- }
-
- }
-
- protected void addToBuffer() {
- circularBuffer.add(currentValue);
- changed = true;
- minibatchCounter++;
- currentValue = null;
- }
-
- protected boolean miniBatchEnd() {
- if ((counter % granularity) == 0) {
- counter = 0;
- return true;
- } else {
- return false;
- }
- }
-
- public boolean batchEnd() {
- if (minibatchCounter == numberOfBatches) {
- minibatchCounter -= batchPerSlide;
- return true;
- }
- return false;
- }
-
- public void reduceLastBatch() throws Exception {
-
- if (miniBatchInProgress()) {
- addToBuffer();
- }
- if (changed == true && minibatchCounter >= 0) {
- if (circularBuffer.isFull()) {
- for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
- if (!circularBuffer.isEmpty()) {
- circularBuffer.remove();
- }
- }
- }
- if (!circularBuffer.isEmpty()) {
- reduce(this);
- }
- }
- }
-
- public boolean miniBatchInProgress() {
- return currentValue != null;
- }
-
- public void reduceBatch() {
- reduce(this);
- changed = false;
- }
-
- @SuppressWarnings("unchecked")
- public Iterator<OUT> getIterator() {
- return circularBuffer.iterator();
- }
-
- @Override
- public String toString() {
- return circularBuffer.toString();
- }
-
- }
-
-
- @Override
- public void open(Configuration config) throws Exception{
- super.open(config);
- serializer = inSerializer.getObjectSerializer();
- this.batch = new StreamBatch();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
deleted file mode 100755
index 3564829..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedBatchGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- Map<Object, StreamBatch> streamBatches;
- KeySelector<IN, ?> keySelector;
-
-
- public GroupedBatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
- long slideSize, KeySelector<IN, ?> keySelector) {
- super(reduceFunction, batchSize, slideSize);
- this.keySelector = keySelector;
- this.streamBatches = new HashMap<Object, StreamBatch>();
- }
-
- @Override
- protected StreamBatch getBatch(StreamRecord<IN> next) {
- Object key = next.getKey(keySelector);
- StreamBatch batch = streamBatches.get(key);
- if(batch == null){
- batch=new StreamBatch();
- streamBatches.put(key, batch);
- }
- return batch;
- }
-
- @Override
- protected void reduceLastBatch() {
- for(StreamBatch batch: streamBatches.values()){
- batch.reduceLastBatch();
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
deleted file mode 100755
index cfb128c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
-
- private static final long serialVersionUID = 1L;
- KeySelector<OUT, ?> keySelector;
- Map<Object, StreamBatch> streamBatches;
-
- public GroupedBatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize,
- long slideSize, KeySelector<OUT, ?> keySelector) {
- super(reduceFunction, batchSize, slideSize);
- this.keySelector = keySelector;
- this.streamBatches = new HashMap<Object, StreamBatch>();
- }
-
- @Override
- protected StreamBatch getBatch(StreamRecord<OUT> next) {
- Object key = next.getKey(keySelector);
- StreamBatch batch = streamBatches.get(key);
- if (batch == null) {
- batch = new StreamBatch();
- streamBatches.put(key, batch);
- }
- return batch;
- }
-
- @Override
- protected void reduceLastBatch() throws Exception {
- for (StreamBatch batch : streamBatches.values()) {
- batch.reduceLastBatch();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
deleted file mode 100755
index 0641ae3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- KeySelector<IN, ?> keySelector;
- Map<Object, StreamWindow> streamWindows;
- List<Object> cleanList;
- long currentMiniBatchCount = 0;
-
- public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction,
- long windowSize, long slideInterval, KeySelector<IN, ?> keySelector,
- TimeStamp<IN> timestamp) {
- super(reduceFunction, windowSize, slideInterval, timestamp);
- this.keySelector = keySelector;
- this.reducer = reduceFunction;
- this.streamWindows = new HashMap<Object, StreamWindow>();
- }
-
- @Override
- protected StreamBatch getBatch(StreamRecord<IN> next) {
- Object key = next.getKey(keySelector);
- StreamWindow window = streamWindows.get(key);
- if (window == null) {
- window = new GroupedStreamWindow();
- for (int i = 0; i < currentMiniBatchCount; i++) {
- window.circularList.newSlide();
- }
- streamWindows.put(key, window);
- }
- this.window = window;
- return window;
- }
-
- @Override
- protected void reduceLastBatch() {
- for (StreamBatch window : streamWindows.values()) {
- window.reduceLastBatch();
- }
- }
-
- private void shiftGranularityAllWindows() {
- for (StreamBatch window : streamWindows.values()) {
- window.circularList.newSlide();
- }
- }
-
- private void slideAllWindows() {
- currentMiniBatchCount -= batchPerSlide;
- for (StreamBatch window : streamWindows.values()) {
- window.circularList.shiftWindow(batchPerSlide);
- }
- }
-
- private void reduceAllWindows() {
- for (StreamBatch window : streamWindows.values()) {
- window.reduceBatch();
- }
- }
-
- protected class GroupedStreamWindow extends StreamWindow {
-
- private static final long serialVersionUID = 1L;
-
- public GroupedStreamWindow() {
- super();
- }
-
- @Override
- public void addToBuffer(IN nextValue) throws Exception {
- checkWindowEnd(timestamp.getTimestamp(nextValue));
- if (currentMiniBatchCount >= 0) {
- circularList.add(nextValue);
- }
- }
-
- @Override
- protected synchronized void checkWindowEnd(long timeStamp) {
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- shiftGranularityAllWindows();
- currentMiniBatchCount += 1;
- if (batchEnd()) {
- reduceAllWindows();
- slideAllWindows();
- }
- }
- }
-
- @Override
- public boolean batchEnd() {
- if (currentMiniBatchCount == numberOfBatches) {
- return true;
- }
- return false;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
deleted file mode 100644
index 2248096..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT> {
-
- private static final long serialVersionUID = 1L;
- private KeySelector<OUT, ?> keySelector;
- private Map<Object, StreamWindow> streamWindows;
- private long currentMiniBatchCount = 0;
-
- public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
- long slideInterval, KeySelector<OUT, ?> keySelector, TimeStamp<OUT> timestamp) {
- super(reduceFunction, windowSize, slideInterval, timestamp);
- this.keySelector = keySelector;
- this.streamWindows = new HashMap<Object, StreamWindow>();
- }
-
- @Override
- protected StreamBatch getBatch(StreamRecord<OUT> next) {
- Object key = next.getKey(keySelector);
- StreamWindow window = streamWindows.get(key);
- if (window == null) {
- window = new GroupedStreamWindow();
- window.minibatchCounter = currentMiniBatchCount;
- streamWindows.put(key, window);
- }
- this.window = window;
- return window;
- }
-
- private void addToAllBuffers() {
- for (StreamBatch window : streamWindows.values()) {
- window.addToBuffer();
- }
- }
-
- private void reduceAllWindows() {
- for (StreamBatch window : streamWindows.values()) {
- window.minibatchCounter -= batchPerSlide;
- window.reduceBatch();
- }
- }
-
- @Override
- protected void reduceLastBatch() throws Exception {
- for (StreamBatch window : streamWindows.values()) {
- window.reduceLastBatch();
- }
- }
-
- protected class GroupedStreamWindow extends StreamWindow {
-
- private static final long serialVersionUID = 1L;
-
- public GroupedStreamWindow() {
- super();
- }
-
- @Override
- protected synchronized void checkWindowEnd(long timeStamp) {
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- addToAllBuffers();
- if (batchEnd()) {
- reduceAllWindows();
- }
- }
- currentMiniBatchCount = this.minibatchCounter;
- }
-
- @Override
- public boolean batchEnd() {
- if (minibatchCounter == numberOfBatches) {
- return true;
- }
- return false;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
index 7fc35ca..e957b8c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
@@ -21,9 +21,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -69,7 +70,7 @@ import org.slf4j.LoggerFactory;
* @param <IN>
* The type of input elements handled by this operator invokable.
*/
-public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
+public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
/**
* Auto-generated serial version UID
@@ -84,7 +85,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
- private Map<Object, WindowingInvokable<IN>> windowingGroups = new HashMap<Object, WindowingInvokable<IN>>();
+ private Map<Object, WindowingInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowingInvokable<IN, OUT>>();
private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
@@ -117,7 +118,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* that it forwards/distributed calls all groups.
*
* @param userFunction
- * The user defined {@link ReduceFunction}.
+ * The user defined function.
* @param keySelector
* A key selector to extract the key for the groups from the
* input data.
@@ -137,7 +138,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* If only one element is contained a group, this element itself
* is returned as aggregated result.)
*/
- public GroupedWindowingInvokable(ReduceFunction<IN> userFunction,
+ public GroupedWindowingInvokable(Function userFunction,
KeySelector<IN, ?> keySelector,
LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
@@ -165,8 +166,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
// Continuously run
while (reuse != null) {
- WindowingInvokable<IN> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
- .getObject()));
+ WindowingInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
+ .getKey(reuse.getObject()));
if (groupInvokable == null) {
groupInvokable = makeNewGroup(reuse);
}
@@ -175,7 +176,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
IN[] result = trigger.preNotifyTrigger(reuse.getObject());
for (IN in : result) {
- for (WindowingInvokable<IN> group : windowingGroups.values()) {
+ for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
}
}
@@ -193,7 +194,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
groupInvokable.processRealElement(reuse.getObject());
} else {
// call user function for all groups
- for (WindowingInvokable<IN> group : windowingGroups.values()) {
+ for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
if (group == groupInvokable) {
// process real with initialized policies
group.processRealElement(reuse.getObject(), currentTriggerPolicies);
@@ -219,7 +220,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
}
// finally trigger the buffer.
- for (WindowingInvokable<IN> group : windowingGroups.values()) {
+ for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.emitFinalWindow(centralTriggerPolicies);
}
@@ -239,7 +240,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* {@link KeySelector#getKey(Object)}, the exception is not
* catched by this method.
*/
- private WindowingInvokable<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
+ @SuppressWarnings("unchecked")
+ private WindowingInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception {
// clone the policies
LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
@@ -250,10 +252,17 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
clonedDistributedEvictionPolicies.add(eviction.clone());
}
- @SuppressWarnings("unchecked")
- WindowingInvokable<IN> groupInvokable = new WindowingInvokable<IN>(
- (ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
- clonedDistributedEvictionPolicies);
+ WindowingInvokable<IN, OUT> groupInvokable;
+ if (userFunction instanceof ReduceFunction) {
+ groupInvokable = (WindowingInvokable<IN, OUT>) new WindowingReduceInvokable<IN>(
+ (ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
+ clonedDistributedEvictionPolicies);
+ } else {
+ groupInvokable = new WindowingGroupInvokable<IN, OUT>(
+ (GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies,
+ clonedDistributedEvictionPolicies);
+ }
+
groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
groupInvokable.open(this.parameters);
windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
@@ -305,7 +314,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
@Override
public void sendFakeElement(IN datapoint) {
- for (WindowingInvokable<IN> group : windowingGroups.values()) {
+ for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
deleted file mode 100755
index c69816d4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
- private long startTime;
- protected long nextRecordTime;
- protected TimeStamp<IN> timestamp;
- protected StreamWindow window;
-
- public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
- long slideInterval, TimeStamp<IN> timestamp) {
- super(reduceFunction, windowSize, slideInterval);
- this.timestamp = timestamp;
- this.startTime = timestamp.getStartTime();
- this.window = new StreamWindow();
- this.batch = this.window;
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
- if (timestamp instanceof DefaultTimeStamp) {
- (new TimeCheck()).start();
- }
- }
-
- protected class StreamWindow extends StreamBatch {
-
- private static final long serialVersionUID = 1L;
-
- public StreamWindow() {
- super();
- }
-
- @Override
- public void addToBuffer(IN nextValue) throws Exception {
- checkWindowEnd(timestamp.getTimestamp(nextValue));
- if (minibatchCounter >= 0) {
- circularList.add(nextValue);
- }
- }
-
- protected synchronized void checkWindowEnd(long timeStamp) {
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- circularList.newSlide();
- minibatchCounter++;
- if (batchEnd()) {
- reduceBatch();
- circularList.shiftWindow(batchPerSlide);
- }
- }
- }
-
- @Override
- protected boolean miniBatchEnd() {
- if (nextRecordTime < startTime + granularity) {
- return false;
- } else {
- startTime += granularity;
- return true;
- }
- }
-
- }
-
- private class TimeCheck extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(slideSize);
- } catch (InterruptedException e) {
- }
- if (isRunning) {
- window.checkWindowEnd(System.currentTimeMillis());
- } else {
- break;
- }
- }
- }
- }
-
-}