You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:49 UTC
[06/18] git commit: [streaming] window and batch operator added to
DataStream + Documentation updated accordingly
[streaming] window and batch operator added to DataStream + Documentation updated accordingly
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5f601cf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5f601cf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5f601cf9
Branch: refs/heads/master
Commit: 5f601cf9b18fef0b54a92e42405c0179e639f5da
Parents: 47d02a0
Author: gyfora <gy...@gmail.com>
Authored: Mon Sep 8 02:05:20 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200
----------------------------------------------------------------------
docs/streaming_guide.md | 50 ++--
.../api/datastream/BatchedDataStream.java | 238 +++++++++++++++++++
.../streaming/api/datastream/DataStream.java | 197 ++++++---------
.../api/datastream/GroupedDataStream.java | 8 +-
.../api/datastream/WindowDataStream.java | 87 +++++++
.../GroupedWindowGroupReduceInvokable.java | 4 +-
.../operator/GroupedWindowReduceInvokable.java | 19 +-
.../operator/WindowGroupReduceInvokable.java | 12 +-
.../operator/WindowReduceInvokable.java | 18 +-
.../api/invokable/util/DefaultTimeStamp.java | 39 +++
.../api/invokable/util/DefaultTimestamp.java | 34 ---
.../streaming/api/invokable/util/TimeStamp.java | 46 ++++
.../streaming/api/invokable/util/Timestamp.java | 38 ---
.../WindowGroupReduceInvokableTest.java | 19 +-
.../operator/WindowReduceInvokableTest.java | 18 +-
.../ml/IncrementalLearningSkeleton.java | 6 +-
16 files changed, 561 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 04e2f2e..6ed53df 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -242,42 +242,42 @@ Merges two or more `DataStream` instances creating a new DataStream containing a
dataStream.merge(otherStream1, otherStream2…)
```
+### Grouped operators
+
+Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`:
+
+#### Reduce on GroupedDataStream
+When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
+
+### Aggregations
+
+The Flink streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
+
+Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`
+
+For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, 0 is used as default.
+
### Window/Batch operators
Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default.
-#### Window reduce
-The transformation calls a user-defined `GroupReduceFunction` on records received during the predefined time window. The window is shifted after each reduce call.
-A window reduce that sums the elements in the last minute with 10 seconds stepsize:
+When applied to grouped data streams the operators applied will be executed on groups of elements grouped by the selected key position.
-```java
-dataStream.windowReduce(new GroupReduceFunction<Integer, Integer>() {
- @Override
- public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {
- Integer sum = 0;
- for(Integer val: values){
- sum+=val;
- }
- }
- }, 60000, 10000);
-```
+#### Reduce on windowed/batched data streams
+The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations.
-#### Batch reduce
-The transformation calls a `GroupReduceFunction` for each data batch of the predefined size. The batch slides by the predefined number of elements after each call. Works similarly to window reduce.
+A window reduce that sums the elements in the last minute with 10 seconds slide interval:
```java
-dataStream.batchReduce(reducer, batchSize, slideSize)
+dataStream.window(60000, 10000).sum();
```
-### Grouped operators
-
-Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`:
-
-#### Reduce on GroupedDataStream
-When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
+#### ReduceGroup on windowed/batched data streams
+The transformation calls a `GroupReduceFunction` for each data batch or data window. The batch/window slides by the predefined number of elements/time after each call.
-#### Window/Batchreduce on GroupedDataStream
-Similarly to the grouped reduce operator the window and batch reduce operators work the same way as in the non-grouped case except that in a data window/batch every `GroupReduceFunction` call will receive data elements for only the same keys.
+```java
+dataStream.batch(1000, 100).reduceGroup(reducer);
+```
### Co operators
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
new file mode 100755
index 0000000..0aa5de6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.types.TypeInformation;
+
+/**
+ * A {@link BatchedDataStream} represents a data stream whose elements are
+ * batched together in a sliding batch. operations like
+ * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
+ * are applied for each batch and the batch is slid afterwards.
+ *
+ * @param <OUT>
+ * The output type of the {@link BatchedDataStream}
+ */
+public class BatchedDataStream<OUT> {
+
+ protected DataStream<OUT> dataStream;
+ protected boolean isGrouped;
+ protected int keyPosition;
+ protected long batchSize;
+ protected long slideSize;
+
+ protected BatchedDataStream(DataStream<OUT> dataStream, long batchSize, long slideSize) {
+ if (dataStream instanceof GroupedDataStream) {
+ this.isGrouped = true;
+ this.keyPosition = ((GroupedDataStream<OUT>) dataStream).keyPosition;
+ } else {
+ this.isGrouped = false;
+ }
+ this.dataStream = dataStream.copy();
+ this.batchSize = batchSize;
+ this.slideSize = slideSize;
+ }
+
+ protected BatchedDataStream(BatchedDataStream<OUT> batchedDataStream) {
+ this.dataStream = batchedDataStream.dataStream.copy();
+ this.isGrouped = batchedDataStream.isGrouped;
+ this.keyPosition = batchedDataStream.keyPosition;
+ this.batchSize = batchedDataStream.batchSize;
+ this.slideSize = batchedDataStream.slideSize;
+ }
+
+ /**
+ * Groups the elements of the {@link BatchedDataStream} by the given key
+ * position to be used with grouped operators.
+ *
+ * @param keyPosition
+ * The position of the field on which the
+ * {@link BatchedDataStream} will be grouped.
+ * @return The transformed {@link BatchedDataStream}
+ */
+ public BatchedDataStream<OUT> groupBy(int keyPosition) {
+ return new BatchedDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize);
+ }
+
+ /**
+ * Applies a reduce transformation on every sliding batch/window of the data
+ * stream. If the data stream is grouped then the reducer is applied on
+ * every group of elements sharing the same key. This type of reduce is much
+ * faster than reduceGroup since the reduce function can be applied
+ * incrementally. The user can also extend the {@link RichReduceFunction} to
+ * gain access to other features provided by the {@link RichFuntion}
+ * interface.
+ *
+ * @param reducer
+ * The {@link ReduceFunction} that will be called for every
+ * element of the input values in the batch/window.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
+ return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
+ ReduceFunction.class, 0), getReduceInvokable(reducer));
+ }
+
+ /**
+ * Applies a reduceGroup transformation on preset batches/windows of the
+ * DataStream. The transformation calls a {@link GroupReduceFunction} for
+ * each batch/window. Each GroupReduceFunction call can return any number of
+ * elements including none. The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ * @param reducer
+ * The {@link GroupReduceFunction} that will be called for every
+ * batch/window.
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
+ return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer, GroupReduceFunction.class,
+ 1), getGroupReduceInvokable(reducer));
+ }
+
+ /**
+ * Applies an aggregation that sums every sliding batch/window of the data
+ * stream at the given position.
+ *
+ * @param positionToSum
+ * The position in the data point to sum
+ * @return The transformed DataStream.
+ */
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+ dataStream.checkFieldRange(positionToSum);
+ return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
+ positionToSum, dataStream.getClassAtPos(positionToSum)));
+ }
+
+ /**
+ * Syntactic sugar for sum(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> sum() {
+ return sum(0);
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum of every sliding
+ * batch/window of the data stream at the given position.
+ *
+ * @param positionToMin
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+ dataStream.checkFieldRange(positionToMin);
+ return aggregate(new MinAggregationFunction<OUT>(positionToMin));
+ }
+
+ /**
+ * Syntactic sugar for min(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> min() {
+ return min(0);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum of every sliding
+ * batch/window of the data stream at the given position.
+ *
+ * @param positionToMax
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+ dataStream.checkFieldRange(positionToMax);
+ return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
+ }
+
+ /**
+ * Syntactic sugar for max(0)
+ *
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> max() {
+ return max(0);
+ }
+
+ /**
+ * Gets the output type.
+ *
+ * @return The output type.
+ */
+ public TypeInformation<OUT> getOutputType() {
+ return dataStream.getOutputType();
+ }
+
+ private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
+ BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);
+
+ SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
+ aggregate, null, null, invokable);
+
+ dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId());
+ return returnStream;
+ }
+
+ protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+ BatchReduceInvokable<OUT> invokable;
+ if (isGrouped) {
+ invokable = new GroupedBatchReduceInvokable<OUT>(reducer, batchSize, slideSize,
+ keyPosition);
+ } else {
+ invokable = new BatchReduceInvokable<OUT>(reducer, batchSize, slideSize);
+ }
+ return invokable;
+ }
+
+ protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
+ GroupReduceFunction<OUT, R> reducer) {
+ BatchGroupReduceInvokable<OUT, R> invokable;
+ if (isGrouped) {
+ invokable = new GroupedBatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
+ keyPosition);
+ } else {
+ invokable = new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize);
+ }
+ return invokable;
+ }
+
+ protected BatchedDataStream<OUT> copy() {
+ return new BatchedDataStream<OUT>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 70348d6..bebda91 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -26,12 +26,10 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -49,14 +47,12 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
@@ -385,160 +381,113 @@ public class DataStream<OUT> {
ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
}
- public GroupedDataStream<OUT> groupBy(int keyPosition) {
- return new GroupedDataStream<OUT>(this, keyPosition);
- }
-
/**
- * Applies a reduce transformation on preset chunks of the DataStream. The
- * transformation calls a {@link GroupReduceFunction} for each tuple batch
- * of the predefined size. Each GroupReduceFunction call can return any
- * number of elements including none. The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
+ * Groups the elements of a {@link DataStream} by the given key position to
+ * be used with grouped operators like
+ * {@link GroupedDataStream#reduce(ReduceFunction)}
*
- *
- * @param reducer
- * The GroupReduceFunction that is called for each tuple batch.
- * @param batchSize
- * The number of tuples grouped together in the batch.
- * @param <R>
- * output type
- * @return The transformed {@link DataStream}.
+ * @param keyPosition
+ * The position of the field on which the {@link DataStream} will
+ * be grouped.
+ * @return The transformed {@link DataStream}
*/
- public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
- long batchSize) {
- return batchReduce(reducer, batchSize, batchSize);
+ public GroupedDataStream<OUT> groupBy(int keyPosition) {
+ return new GroupedDataStream<OUT>(this, keyPosition);
}
/**
- * Applies a reduce transformation on preset sliding chunks of the
- * DataStream. The transformation calls a {@link GroupReduceFunction} for
- * each tuple batch of the predefined size. The tuple batch gets slid by the
- * given number of tuples. Each GroupReduceFunction call can return any
- * number of elements including none. The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
- *
+ * Collects the data stream elements into sliding batches creating a new
+ * {@link BatchedDataStream}. The user can apply transformations like
+ * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
+ * or aggregations on the {@link BatchedDataStream}.
*
- * @param reducer
- * The GroupReduceFunction that is called for each tuple batch.
* @param batchSize
- * The number of tuples grouped together in the batch.
+ * The number of elements in each batch at each operator
* @param slideSize
- * The number of tuples the batch is slid by.
- * @param <R>
- * output type
- * @return The transformed {@link DataStream}.
+ * The number of elements with which the batches are slid by
+ * after each transformation.
+ * @return The transformed {@link DataStream}
*/
- public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
- long batchSize, long slideSize) {
+ public BatchedDataStream<OUT> batch(long batchSize, long slideSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive");
}
if (slideSize < 1) {
throw new IllegalArgumentException("Slide size must be positive");
}
-
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
- GroupReduceFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
- GroupReduceFunction.class, 1);
-
- return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
- new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize));
+ return new BatchedDataStream<OUT>(this, batchSize, slideSize);
}
/**
- * Applies a reduce transformation on preset "time" chunks of the
- * DataStream. The transformation calls a {@link GroupReduceFunction} on
- * records received during the predefined time window. The window is shifted
- * after each reduce call. Each GroupReduceFunction call can return any
- * number of elements including none.The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
- *
+ * Collects the data stream elements into sliding batches creating a new
+ * {@link BatchedDataStream}. The user can apply transformations like
+ * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
+ * or aggregations on the {@link BatchedDataStream}.
*
- * @param reducer
- * The GroupReduceFunction that is called for each time window.
- * @param windowSize
- * SingleOutputStreamOperator The time window to run the reducer
- * on, in milliseconds.
- * @param <R>
- * output type
- * @return The transformed DataStream.
- */
- public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
- long windowSize) {
- return windowReduce(reducer, windowSize, windowSize);
- }
-
- /**
- * Applies a reduce transformation on preset "time" chunks of the
- * DataStream. The transformation calls a {@link GroupReduceFunction} on
- * records received during the predefined time window. The window is shifted
- * after each reduce call. Each GroupReduceFunction call can return any
- * number of elements including none.The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
- *
- *
- * @param reducer
- * The GroupReduceFunction that is called for each time window.
- * @param windowSize
- * SingleOutputStreamOperator The time window to run the reducer
- * on, in milliseconds.
- * @param slideInterval
- * The time interval, batch is slid by.
- * @param <R>
- * output type
- * @return The transformed DataStream.
+ * @param batchSize
+ * The number of elements in each batch at each operator
+ * @return The transformed {@link DataStream}
*/
- public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
- long windowSize, long slideInterval) {
- return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+ public BatchedDataStream<OUT> batch(long batchSize) {
+ return batch(batchSize, batchSize);
}
/**
- * Applies a reduce transformation on preset "time" chunks of the
- * DataStream. The transformation calls a {@link GroupReduceFunction} on
- * records received during the predefined time window. The window is shifted
- * after each reduce call. Each GroupReduceFunction call can return any
- * number of elements including none. The time is determined by a
- * user-defined timestamp. The user can also extend
- * {@link RichGroupReduceFunction} to gain access to other features provided
- * by the {@link RichFuntion} interface.
+ * Collects the data stream elements into sliding windows creating a new
+ * {@link WindowDataStream}. The user can apply transformations like
+ * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+ * aggregations on the {@link WindowDataStream}.
*
- *
- * @param reducer
- * The GroupReduceFunction that is called for each time window.
* @param windowSize
- * SingleOutputStreamOperator The time window to run the reducer
- * on, in milliseconds.
+ * The length of the window in milliseconds.
* @param slideInterval
- * The time interval, batch is slid by.
+ * The number of milliseconds with which the windows are slid by
+ * after each transformation.
* @param timestamp
- * Timestamp function to retrieve a timestamp from an element.
- * @param <R>
- * output type
- * @return The transformed DataStream.
+ * User defined function for extracting time-stamps from each
+ * element
+ * @return The transformed {@link DataStream}
*/
- public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
- long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
+ public WindowDataStream<OUT> window(long windowSize, long slideInterval,
+ TimeStamp<OUT> timestamp) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
+ return new WindowDataStream<OUT>(this, windowSize, slideInterval, timestamp);
+ }
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
- GroupReduceFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
- GroupReduceFunction.class, 1);
+ /**
+ * Collects the data stream elements into sliding windows creating a new
+ * {@link WindowDataStream}. The user can apply transformations like
+ * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+ * aggregations on the {@link WindowDataStream}.
+ *
+ * @param windowSize
+ * The length of the window in milliseconds.
+ * @param slideInterval
+ * The number of milliseconds with which the windows are slid by
+ * after each transformation.
+ * @return The transformed {@link DataStream}
+ */
+ public WindowDataStream<OUT> window(long windowSize, long slideInterval) {
+ return window(windowSize, slideInterval, new DefaultTimeStamp<OUT>());
+ }
- return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
- new WindowGroupReduceInvokable<OUT, R>(reducer, windowSize, slideInterval, timestamp));
+ /**
+ * Collects the data stream elements into sliding windows creating a new
+ * {@link WindowDataStream}. The user can apply transformations like
+ * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+ * aggregations on the {@link WindowDataStream}.
+ *
+ * @param windowSize
+ * The length of the window in milliseconds.
+ * @return The transformed {@link DataStream}
+ */
+ public WindowDataStream<OUT> window(long windowSize) {
+ return window(windowSize, windowSize);
}
/**
@@ -1115,7 +1064,7 @@ public class DataStream<OUT> {
*
* @return The copy
*/
- protected DataStream<OUT> copy(){
+ protected DataStream<OUT> copy() {
return new DataStream<OUT>(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index e513f2d..138a6f8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
@@ -166,7 +166,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
long windowSize, long slideInterval) {
- return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+ return windowReduce(reducer, windowSize, slideInterval, new DefaultTimeStamp<OUT>());
}
/**
@@ -191,7 +191,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* @return The transformed DataStream.
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
- long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
+ long windowSize, long slideInterval, TimeStamp<OUT> timestamp) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
GroupReduceFunction.class, 1), new GroupedWindowGroupReduceInvokable<OUT, R>(reducer,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
new file mode 100755
index 0000000..4756050
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+/**
+ * A {@link WindowDataStream} represents a data stream whose elements are
+ * batched together in a sliding window. operations like
+ * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
+ * are applied for each window and the window is slid afterwards.
+ *
+ * @param <OUT>
+ * The output type of the {@link WindowDataStream}
+ */
+public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
+
+ TimeStamp<OUT> timeStamp;
+
+ protected WindowDataStream(DataStream<OUT> dataStream, long windowSize, long slideInterval,
+ TimeStamp<OUT> timeStamp) {
+ super(dataStream, windowSize, slideInterval);
+ this.timeStamp = timeStamp;
+ }
+
+ protected WindowDataStream(WindowDataStream<OUT> windowDataStream) {
+ super(windowDataStream);
+ this.timeStamp = windowDataStream.timeStamp;
+ }
+
+ public WindowDataStream<OUT> groupBy(int keyPosition) {
+ return new WindowDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize,
+ timeStamp);
+ }
+
+ protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+ BatchReduceInvokable<OUT> invokable;
+ if (isGrouped) {
+ invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
+ keyPosition, timeStamp);
+ } else {
+ invokable = new WindowReduceInvokable<OUT>(reducer, batchSize, slideSize, timeStamp);
+ }
+ return invokable;
+ }
+
+ protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
+ GroupReduceFunction<OUT, R> reducer) {
+ BatchGroupReduceInvokable<OUT, R> invokable;
+ if (isGrouped) {
+ invokable = new GroupedWindowGroupReduceInvokable<OUT, R>(reducer, batchSize,
+ slideSize, keyPosition, timeStamp);
+ } else {
+ invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
+ timeStamp);
+ }
+ return invokable;
+ }
+
+ public WindowDataStream<OUT> copy() {
+ return new WindowDataStream<OUT>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
index 4027b78..865dced 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
@@ -33,7 +33,7 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
private MutableTableState<Object, List<IN>> values;
public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
- long slideInterval, int keyPosition, Timestamp<IN> timestamp) {
+ long slideInterval, int keyPosition, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp);
this.keyPosition = keyPosition;
this.reducer = reduceFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
index e202e86..df94843 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
@@ -18,33 +18,26 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.io.IOException;
-import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.SlidingWindowState;
public class GroupedWindowReduceInvokable<OUT> extends GroupedBatchReduceInvokable<OUT> {
private static final long serialVersionUID = 1L;
- protected transient SlidingWindowState<Map<Object, OUT>> state;
- private Timestamp<OUT> timestamp;
+ private TimeStamp<OUT> timestamp;
private long startTime;
private long nextRecordTime;
public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
- long slideInterval, Timestamp<OUT> timestamp, int keyPosition) {
+ long slideInterval, int keyPosition, TimeStamp<OUT> timestamp) {
super(reduceFunction, windowSize, slideInterval, keyPosition);
this.timestamp = timestamp;
+ this.startTime = timestamp.getStartTime();
}
-
- @Override
- protected void initializeAtFirstRecord() {
- startTime = nextRecordTime - (nextRecordTime % granularity);
- }
-
+
@Override
protected StreamRecord<OUT> getNextRecord() throws IOException {
reuse = recordIterator.next(reuse);
@@ -53,7 +46,7 @@ public class GroupedWindowReduceInvokable<OUT> extends GroupedBatchReduceInvokab
}
return reuse;
}
-
+
@Override
protected boolean batchNotFull() {
if (nextRecordTime < startTime + granularity) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 7b4317a..03c19d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -20,24 +20,20 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.io.IOException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private long startTime;
private long nextRecordTime;
- private Timestamp<IN> timestamp;
+ private TimeStamp<IN> timestamp;
public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
- long slideInterval, Timestamp<IN> timestamp) {
+ long slideInterval, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp;
- }
-
- @Override
- protected void initializeAtFirstRecord() {
- startTime = nextRecordTime - (nextRecordTime % granularity);
+ this.startTime = timestamp.getStartTime();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 0f13397..bd51c65 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -21,25 +21,21 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
private static final long serialVersionUID = 1L;
private long startTime;
private long nextRecordTime;
- private Timestamp<OUT> timestamp;
+ private TimeStamp<OUT> timestamp;
private String nullElement = "nullElement";
public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
- long slideInterval, Timestamp<OUT> timestamp) {
+ long slideInterval, TimeStamp<OUT> timestamp) {
super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp;
- }
-
- @Override
- protected void initializeAtFirstRecord() {
- startTime = nextRecordTime - (nextRecordTime % granularity);
+ this.startTime = timestamp.getStartTime();
}
protected StreamRecord<OUT> getNextRecord() throws IOException {
@@ -59,7 +55,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
return false;
}
}
-
+
@Override
protected void collectOneUnit() throws Exception {
OUT reduced = null;
@@ -71,9 +67,9 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
resetReuse();
}
}
- if(reduced!=null){
+ if (reduced != null) {
state.pushBack(reduced);
- }else{
+ } else {
state.pushBack(nullElement);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
new file mode 100644
index 0000000..b6186e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+/**
+ * Default timestamp function that uses the Java System.currentTimeMillis()
+ * method to retrieve a timestamp.
+ *
+ * @param <T>
+ * Type of the inputs of the reducing function.
+ */
+public class DefaultTimeStamp<T> implements TimeStamp<T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(T value) {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long getStartTime() {
+ return System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
deleted file mode 100644
index 8276a01..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-/**
- * Default timestamp function that uses the Java System.currentTimeMillis()
- * method to retrieve a timestamp.
- *
- * @param <T>
- * Type of the inputs of the reducing function.
- */
-public class DefaultTimestamp<T> implements Timestamp<T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(T value) {
- return System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
new file mode 100644
index 0000000..27447d7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ * Type of the value to create the timestamp from.
+ */
+public interface TimeStamp<T> extends Serializable {
+
+ /**
+ * Values
+ *
+ * @param value
+ * The value to create the timestamp from
+ * @return The timestamp
+ */
+ public long getTimestamp(T value);
+
+ /**
+ * Function to define the starting time for reference
+ *
+ * @return The starting timestamp
+ */
+ public long getStartTime();
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
deleted file mode 100644
index 91758e8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- * Type of the value to create the timestamp from.
- */
-public interface Timestamp<T> extends Serializable {
-
- /**
- * Values
- * @param value
- * The value to create the timestamp from
- * @return The timestamp
- */
- public long getTimestamp(T value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
index 7437bec..097e391 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Before;
@@ -45,13 +45,15 @@ public class WindowGroupReduceInvokableTest {
}
}
- public static final class MyTimestamp implements Timestamp<Integer> {
+ public static final class MyTimestamp implements TimeStamp<Integer> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
+ private long start;
public MyTimestamp(List<Long> timestamps) {
this.timestamps = timestamps.iterator();
+ this.start = timestamps.get(0);
}
@Override
@@ -59,6 +61,11 @@ public class WindowGroupReduceInvokableTest {
long ts = timestamps.next();
return ts;
}
+
+ @Override
+ public long getStartTime() {
+ return start;
+ }
}
private final static String EOW = "|";
@@ -81,16 +88,16 @@ public class WindowGroupReduceInvokableTest {
slideSize = 5;
timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, "3", "4", "5", "6", EOW, "3",
- "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8",
- EOW, "7", "8", "9", EOW, "9", "10", EOW));
+ "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", EOW, "7",
+ "8", "9", EOW, "8", "9", "10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
windowSize = 10;
slideSize = 4;
timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
- expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", EOW, "3", "4", "5", "6",
- EOW, "4", "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "7", "8", "9", EOW, "9",
+ expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6",
+ EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9",
"10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
index 1aed25f..ff0951d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
@@ -52,13 +52,18 @@ public class WindowReduceInvokableTest {
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
- }, 4, 2, new Timestamp<Integer>() {
+ }, 4, 2, new TimeStamp<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
});
List<Integer> expected = new ArrayList<Integer>();
@@ -86,14 +91,19 @@ public class WindowReduceInvokableTest {
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
}
- }, 3, 2, new Timestamp<Tuple2<String, Integer>>() {
+ }, 3, 2, 0, new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<String, Integer> value) {
return value.f1;
}
- }, 0);
+
+ @Override
+ public long getStartTime() {
+ return 1;
+ }
+ });
List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
expected2.add(new Tuple2<String, Integer>("a", 6));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d80b937..a433fd0 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -119,12 +119,12 @@ public class IncrementalLearningSkeleton {
public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM).setBufferTimeout(1000);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+ PARALLELISM).setBufferTimeout(1000);
// Build new model on every second of new data
DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
- .windowReduce(new PartialModelBuilder(), 5000);
+ .window(5000).reduceGroup(new PartialModelBuilder());
// Use partial model for prediction
DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)