You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:19 UTC
[42/51] [abbrv] git commit: [streaming] Implemented sliding window
and batchReduce
[streaming] Implemented sliding window and batchReduce
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e089959c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e089959c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e089959c
Branch: refs/heads/master
Commit: e089959c0e4bb3e08a95b1b6b9076935c87a5a02
Parents: 309727e
Author: ghermann <re...@gmail.com>
Authored: Thu Aug 7 20:33:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 41 +++++-
.../operator/BatchReduceInvokable.java | 135 ++++++++---------
.../operator/WindowReduceInvokable.java | 118 ++++++---------
.../streaming/state/SlidingWindowState.java | 42 ++++--
.../state/SlidingWindowStateIterator.java | 41 +++++-
.../api/invokable/operator/BatchReduceTest.java | 143 +++++++++++++++----
.../streaming/state/SlidingWindowStateTest.java | 77 ++++++++++
7 files changed, 396 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bec55e0..531f43c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -326,7 +326,34 @@ public abstract class DataStream<OUT> {
int batchSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
- batchSize));
+ batchSize, batchSize));
+ }
+
+ /**
+ * Applies a reduce transformation on preset sliding chunks of the
+ * DataStream. The transformation calls a {@link GroupReduceFunction} for
+ * each tuple batch of the predefined size. The tuple batch gets slid by the
+ * given number of tuples. Each GroupReduceFunction call can return any
+ * number of elements including none. The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ *
+ * @param reducer
+ * The GroupReduceFunction that is called for each tuple batch.
+ * @param batchSize
+ * The number of tuples grouped together in the batch.
+ * @param slideSize
+ * The number of tuples the batch is slid by.
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+ int batchSize, int slideSize) {
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
+ batchSize, slideSize));
}
/**
@@ -342,7 +369,8 @@ public abstract class DataStream<OUT> {
* @param reducer
* The GroupReduceFunction that is called for each time window.
* @param windowSize
- * The time window to run the reducer on, in milliseconds.
+ * SingleOutputStreamOperator The time window to run the reducer
+ * on, in milliseconds.
* @param <R>
* output type
* @return The transformed DataStream.
@@ -351,7 +379,14 @@ public abstract class DataStream<OUT> {
long windowSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
- windowSize));
+ windowSize, windowSize, windowSize));
+ }
+
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize, long slideInterval, long timeUnitInMillis) {
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
+ windowSize, slideInterval, timeUnitInMillis));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 4e0a7a5..ffd4f1a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -20,106 +20,87 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList;
-import java.util.List;
+import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.state.SlidingWindowState;
public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private int batchSize;
-
- public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
- super(reduceFunction);
+ private int slideSize;
+ private int granularity;
+ private boolean emitted;
+ private transient SlidingWindowState<IN> state;
+
+ public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize,
+ int slideSize) {
+ super(reduceFunction);
this.reducer = reduceFunction;
this.batchSize = batchSize;
+ this.slideSize = slideSize;
+ this.granularity = MathUtils.gcd(batchSize, slideSize);
}
@Override
protected void immutableInvoke() throws Exception {
- List<IN> tupleBatch = new ArrayList<IN>();
- boolean batchStart;
- int counter = 0;
-
- while (loadNextRecord() != null) {
- batchStart = true;
- do {
- if (batchStart) {
- batchStart = false;
- } else {
- reuse = loadNextRecord();
- if (reuse == null) {
- break;
- }
- }
- counter++;
- tupleBatch.add(reuse.getObject());
- resetReuse();
- } while (counter < batchSize);
- reducer.reduce(tupleBatch, collector);
- tupleBatch.clear();
- counter = 0;
- }
-
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- userIterator = new CounterIterator();
-
- do {
- if (userIterator.hasNext()) {
- reducer.reduce(userIterable, collector);
- userIterator.reset();
+ reuse = loadNextRecord();
+ ArrayList<IN> list;
+
+ while (!state.isFull()) {
+ list = new ArrayList<IN>(granularity);
+ try {
+ state.pushBack(fillArray(list));
+ } catch (NullPointerException e) {
+ throw new RuntimeException("DataStream length must be greater than batchsize");
}
- } while (reuse != null);
- }
-
- private class CounterIterator implements BatchIterator<IN> {
- private int counter;
- private boolean loadedNext;
-
- public CounterIterator() {
- counter = 1;
}
- @Override
- public boolean hasNext() {
- if (counter > batchSize) {
- return false;
- } else if (!loadedNext) {
- loadNextRecord();
- loadedNext = true;
- }
- return (reuse != null);
- }
+ boolean go = reduce();
- @Override
- public IN next() {
- if (hasNext()) {
- counter++;
- loadedNext = false;
- return reuse.getObject();
+ while (go) {
+ if (state.isEmittable()) {
+ go = reduce();
} else {
- counter++;
- loadedNext = false;
- return null;
+ list = (ArrayList<IN>) state.popFront();
+ list.clear();
+ state.pushBack(fillArray(list));
+ emitted = false;
+ go = reuse != null;
}
}
-
- public void reset() {
- for (int i = 0; i < (batchSize - counter); i++) {
- loadNextRecord();
- }
- loadNextRecord();
- loadedNext = true;
- counter = 1;
+ if (!emitted) {
+ reduce();
}
+ }
- @Override
- public void remove() {
+ private boolean reduce() throws Exception {
+ userIterator = state.getIterator();
+ reducer.reduce(userIterable, collector);
+ emitted = true;
+ return reuse != null;
+ }
- }
+ private ArrayList<IN> fillArray(ArrayList<IN> list) {
+ int counter = 0;
+ do {
+ counter++;
+ list.add(reuse.getObject());
+ resetReuse();
+ } while ((reuse = loadNextRecord()) != null && counter < granularity);
+ return list;
+ }
+ @Override
+ protected void mutableInvoke() throws Exception {
+ throw new RuntimeException("Reducing mutable sliding batch is not supported.");
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception{
+ super.open(parameters);
+ this.state = new SlidingWindowState<IN>(batchSize, slideSize, granularity);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 3405641..cbc242c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -20,105 +20,71 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList;
-import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.state.SlidingWindowState;
public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private long windowSize;
+ private long slideInterval;
+ private long timeUnitInMillis;
+ private transient SlidingWindowState<IN> state;
volatile boolean isRunning;
- boolean window;
- public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+ public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
+ long slideInterval, long timeUnitInMillis) {
super(reduceFunction);
- this.reducer = reduceFunction;
this.windowSize = windowSize;
- this.window = true;
+ this.slideInterval = slideInterval;
+ this.timeUnitInMillis = timeUnitInMillis;
}
protected void immutableInvoke() throws Exception {
- List<IN> tupleBatch = new ArrayList<IN>();
- boolean batchStart;
-
- long startTime = System.currentTimeMillis();
- while (loadNextRecord() != null) {
- batchStart = true;
- do {
- if (batchStart) {
- batchStart = false;
- } else {
- reuse = loadNextRecord();
- if (reuse == null) {
- break;
- }
- }
- tupleBatch.add(reuse.getObject());
- resetReuse();
- } while (System.currentTimeMillis() - startTime < windowSize);
- reducer.reduce(tupleBatch, collector);
- tupleBatch.clear();
- startTime = System.currentTimeMillis();
+ if ((reuse = loadNextRecord()) == null) {
+ throw new RuntimeException("DataStream must not be empty");
}
- }
-
- protected void mutableInvoke() throws Exception {
- userIterator = new WindowIterator();
-
- do {
- if (userIterator.hasNext()) {
- reducer.reduce(userIterable, collector);
- userIterator.reset();
- }
- } while (reuse != null);
- }
-
- private class WindowIterator implements BatchIterator<IN> {
-
- private boolean loadedNext;
- private long startTime;
-
- public WindowIterator() {
- startTime = System.currentTimeMillis();
+ while (reuse != null && !state.isFull()) {
+ collectOneTimeUnit();
}
+ reduce();
- @Override
- public boolean hasNext() {
- if (System.currentTimeMillis() - startTime > windowSize) {
- return false;
- } else if (!loadedNext) {
- loadNextRecord();
- loadedNext = true;
- }
- return (reuse != null);
- }
-
- @Override
- public IN next() {
- if (hasNext()) {
- loadedNext = false;
- return reuse.getObject();
- } else {
- loadedNext = false;
- return reuse.getObject();
+ while (reuse != null) {
+ for (int i = 0; i < slideInterval / timeUnitInMillis; i++) {
+ collectOneTimeUnit();
}
+ reduce();
}
+ }
- public void reset() {
- while (System.currentTimeMillis() - startTime < windowSize) {
- loadNextRecord();
- }
- loadNextRecord();
- loadedNext = true;
- startTime = System.currentTimeMillis();
- }
+ private void collectOneTimeUnit() {
+ ArrayList<IN> list;
+ list = new ArrayList<IN>();
+ long startTime = System.currentTimeMillis();
- @Override
- public void remove() {
+ do {
+ list.add(reuse.getObject());
+ resetReuse();
+ } while ((reuse = loadNextRecord()) != null
+ && System.currentTimeMillis() - startTime < timeUnitInMillis);
+ state.pushBack(list);
+ }
- }
+ private boolean reduce() throws Exception {
+ userIterator = state.forceGetIterator();
+ reducer.reduce(userIterable, collector);
+ return reuse != null;
+ }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.state = new SlidingWindowState<IN>(windowSize, slideInterval, timeUnitInMillis);
}
+ protected void mutableInvoke() throws Exception {
+ throw new RuntimeException("Reducing mutable sliding window is not supported.");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
index 600b69f..a062ba9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowState.java
@@ -20,43 +20,43 @@
package org.apache.flink.streaming.state;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
-public class SlidingWindowState<InTuple extends Tuple> implements Serializable {
+public class SlidingWindowState<T> implements Serializable {
private static final long serialVersionUID = -2376149970115888901L;
- private int currentRecordCount;
+ private long currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
+ private SlidingWindowStateIterator<T> iterator;
- CircularFifoBuffer buffer;
+ private CircularFifoBuffer buffer;
- public SlidingWindowState(int windowSize, int slidingStep, int computeGranularity) {
+ public SlidingWindowState(long windowSize, long slideInterval, long timeUnitInMillis) {
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
- // computeGranularity.
- this.fullRecordCount = windowSize / computeGranularity;
- this.slideRecordCount = slidingStep / computeGranularity;
+ // computationGranularity.
+ this.fullRecordCount = (int) (windowSize / timeUnitInMillis);
+ this.slideRecordCount = (int) (slideInterval / timeUnitInMillis);
this.buffer = new CircularFifoBuffer(fullRecordCount);
+ this.iterator = new SlidingWindowStateIterator<T>(buffer);
}
- public void pushBack(ArrayList<InTuple> tupleArray) {
- buffer.add(tupleArray);
+ public void pushBack(List<T> array) {
+ buffer.add(array);
currentRecordCount += 1;
}
@SuppressWarnings("unchecked")
- public ArrayList<InTuple> popFront() {
- ArrayList<InTuple> frontRecord = (ArrayList<InTuple>) buffer.get();
+ public List<T> popFront() {
+ List<T> frontRecord = (List<T>) buffer.get();
buffer.remove();
return frontRecord;
}
@@ -65,6 +65,20 @@ public class SlidingWindowState<InTuple extends Tuple> implements Serializable {
return currentRecordCount >= fullRecordCount;
}
+ public SlidingWindowStateIterator<T> getIterator() {
+ if (isFull()) {
+ iterator.reset();
+ return iterator;
+ } else {
+ return null;
+ }
+ }
+
+ public SlidingWindowStateIterator<T> forceGetIterator() {
+ iterator.reset();
+ return iterator;
+ }
+
public boolean isEmittable() {
if (currentRecordCount == fullRecordCount + slideRecordCount) {
currentRecordCount -= slideRecordCount;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
index 69c9a48..6033276 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
@@ -19,18 +19,45 @@
package org.apache.flink.streaming.state;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import java.util.Collection;
+import java.util.Iterator;
-public class SlidingWindowStateIterator<K>{
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.flink.streaming.api.invokable.operator.BatchIterator;
+
+public class SlidingWindowStateIterator<T> implements BatchIterator<T> {
+
+ private CircularFifoBuffer buffer;
+ private Iterator<Collection<T>> iterator;
+ private Iterator<T> subIterator;
+
+ public SlidingWindowStateIterator(CircularFifoBuffer buffer) {
+ this.buffer = buffer;
+ }
public boolean hasNext() {
- return false;
+ return subIterator.hasNext();
}
- public Tuple2<K, StreamRecord<Tuple>> next() {
- return null;
+ public T next() {
+ T nextElement = subIterator.next();
+ if (!subIterator.hasNext()) {
+ if (iterator.hasNext()) {
+ subIterator = iterator.next().iterator();
+ }
+ }
+ return nextElement;
}
+ @Override
+ public void remove() {
+ throw new RuntimeException("Cannot use remove on reducing iterator.");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reset() {
+ iterator = buffer.iterator();
+ subIterator = iterator.next().iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index c91878b..bf7621e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -22,10 +22,14 @@ package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -39,64 +43,155 @@ public class BatchReduceTest {
private static ArrayList<Double> avgs = new ArrayList<Double>();
private static final int BATCH_SIZE = 5;
- private static final int PARALlELISM = 1;
+ private static final int PARALLELISM = 1;
private static final long MEMORYSIZE = 32;
- public static final class MyBatchReduce implements
- GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
+ public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterable<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
- throws Exception {
+ public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception {
Double sum = 0.;
Double count = 0.;
- for (Tuple1<Double> value : values) {
- sum += value.f0;
+ for (Double value : values) {
+ sum += value;
count++;
}
if (count > 0) {
- out.collect(new Tuple1<Double>(sum / count));
+ out.collect(new Double(sum / count));
}
}
}
- public static final class MySink implements SinkFunction<Tuple1<Double>> {
+ public static final class MySink implements SinkFunction<Double> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple1<Double> tuple) {
- avgs.add(tuple.f0);
+ public void invoke(Double tuple) {
+ avgs.add(tuple);
}
}
- public static final class MySource implements SourceFunction<Tuple1<Double>> {
+ public static final class MySource implements SourceFunction<Double> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Collector<Tuple1<Double>> collector) {
+ public void invoke(Collector<Double> collector) {
for (Double i = 1.; i <= 100; i++) {
- collector.collect(new Tuple1<Double>(i));
+ collector.collect(new Double(i));
}
}
}
- @Test
- public void test() throws Exception {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+ public static final class MySlidingBatchReduce implements RichFunction,
+ GroupReduceFunction<Long, String> {
+ private static final long serialVersionUID = 1L;
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALlELISM);
+ double startTime;
- @SuppressWarnings("unused")
- DataStream<Tuple1<Double>> dataStream = env.addSource(new MySource())
- .batchReduce(new MyBatchReduce(), BATCH_SIZE).addSink(new MySink());
+ @Override
+ public void reduce(Iterable<Long> values, Collector<String> out) throws Exception {
+ for (Long value : values) {
+ out.collect(value.toString());
+ }
+ out.collect(END_OF_BATCH);
+ }
- env.executeTest(MEMORYSIZE);
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ startTime = (double) System.currentTimeMillis() / 1000;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return null;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ private static List<SortedSet<String>> sink = new ArrayList<SortedSet<String>>();
+ private static final String END_OF_BATCH = "end of batch";
+
+ public static final class MySlidingSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ SortedSet<String> currentSet = new TreeSet<String>();
+
+ @Override
+ public void invoke(String string) {
+ if (string.equals(END_OF_BATCH)) {
+ sink.add(currentSet);
+ currentSet = new TreeSet<String>();
+ } else {
+ currentSet.add(string);
+ }
+ }
+ }
+
+ private final static int SLIDING_BATCH_SIZE = 9;
+ private final static int SLIDE_SIZE = 6;
+ private static final int SEQUENCE_SIZE = 30;
+ private LocalStreamEnvironment env;
+
+ private void slidingStream() {
+ env.generateSequence(1, SEQUENCE_SIZE)
+ .batchReduce(new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE)
+ .addSink(new MySlidingSink());
+ }
+
+ private void slidingTest() {
+ int firstInBatch = 1;
+
+ for (SortedSet<String> set : sink) {
+ int to = Math.min(firstInBatch + SLIDING_BATCH_SIZE - 1, SEQUENCE_SIZE);
+ assertEquals(getExpectedSet(to), set);
+ firstInBatch += SLIDE_SIZE;
+ }
+ }
+
+ private void nonSlidingStream() {
+ env.addSource(new MySource()).batchReduce(new MyBatchReduce(), BATCH_SIZE)
+ .addSink(new MySink());
+ }
+
+ private void nonSlidingTest() {
for (int i = 0; i < avgs.size(); i++) {
assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
}
}
+
+ @Test
+ public void test() {
+ LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
+ env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
+
+ slidingStream();
+ nonSlidingStream();
+
+ env.executeTest(MEMORYSIZE);
+
+ slidingTest();
+ nonSlidingTest();
+ }
+
+ private SortedSet<String> getExpectedSet(int to) {
+ SortedSet<String> expectedSet = new TreeSet<String>();
+ for (int i = to; i > to - SLIDING_BATCH_SIZE; i--) {
+ expectedSet.add(Integer.toString(i));
+ }
+ return expectedSet;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e089959c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
new file mode 100644
index 0000000..15902f4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/SlidingWindowStateTest.java
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+public class SlidingWindowStateTest {
+
+ private final static int SLIDING_BATCH_SIZE = 3;
+ private final static int SLIDE_SIZE = 2;
+ private static final int UNIT = 1;
+
+ @Test
+ public void test() {
+ SlidingWindowState<Integer> state = new SlidingWindowState<Integer>(SLIDING_BATCH_SIZE,
+ SLIDE_SIZE, UNIT);
+ state.pushBack(Arrays.asList(0));
+ state.pushBack(Arrays.asList(1));
+ assertEquals(false, state.isFull());
+ state.pushBack(Arrays.asList(2));
+ assertTrue(state.isFull());
+
+ SlidingWindowStateIterator<Integer> iterator = state.getIterator();
+
+ SortedSet<Integer> actualSet = new TreeSet<Integer>();
+ while (iterator.hasNext()) {
+ actualSet.add(iterator.next());
+ }
+ assertEquals(getExpectedSet(0, 2), actualSet);
+ actualSet.clear();
+
+ state.pushBack(Arrays.asList(3));
+ assertEquals(false, state.isEmittable());
+ state.pushBack(Arrays.asList(4));
+ assertTrue(state.isEmittable());
+
+ iterator = state.getIterator();
+
+ while (iterator.hasNext()) {
+ actualSet.add(iterator.next());
+ }
+ assertEquals(getExpectedSet(2, 4), actualSet);
+ }
+
+ private SortedSet<Integer> getExpectedSet(int from, int to) {
+ SortedSet<Integer> expectedSet = new TreeSet<Integer>();
+ for (int i = from; i <= to; i++) {
+ expectedSet.add(i);
+ }
+ return expectedSet;
+ }
+
+}