You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/25 21:24:36 UTC
[1/3] flink git commit: [FLINK-2891] [streaming] Set keys for
key/value state in window evaluation of fast-path windows.
Repository: flink
Updated Branches:
refs/heads/release-0.10 ec1730bf5 -> 65fcd3abd
[FLINK-2891] [streaming] Set keys for key/value state in window evaluation of fast-path windows.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fcd3ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fcd3ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fcd3ab
Branch: refs/heads/release-0.10
Commit: 65fcd3abdd4d9ad78c12b95ac7ad0b1e2926609e
Parents: 8e4cb0a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 18:58:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100
----------------------------------------------------------------------
.../api/operators/AbstractStreamOperator.java | 9 +
...ractAlignedProcessingTimeWindowOperator.java | 2 +-
.../windowing/AbstractKeyedTimePanes.java | 3 +-
.../windowing/AccumulatingKeyedTimePanes.java | 22 +-
.../windowing/AggregatingKeyedTimePanes.java | 13 +-
...AlignedProcessingTimeWindowOperatorTest.java | 116 ++++-
...AlignedProcessingTimeWindowOperatorTest.java | 420 +++++++++++++++----
7 files changed, 485 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 078679d..9074b7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -343,6 +343,15 @@ public abstract class AbstractStreamOperator<OUT>
}
}
}
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void setKeyContext(Object key) {
+ if (keyValueStates != null) {
+ for (KvState kv : keyValueStates) {
+ kv.setCurrentKey(key);
+ }
+ }
+ }
// ------------------------------------------------------------------------
// Context and chaining properties
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 3165f88..90d3d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -239,7 +239,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
private void computeWindow(long timestamp) throws Exception {
out.setTimestamp(timestamp);
panes.truncatePanes(numPanesPerWindow);
- panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
+ panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize), this);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index d1cea20..89ce47b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -47,7 +48,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
public abstract void addElementToLatestPane(Type element) throws Exception;
- public abstract void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception;
+ public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception;
public void dispose() {
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index c854e6c..e15de8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.util.UnionIterator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -57,16 +58,21 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
}
@Override
- public void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception {
+ public void evaluateWindow(Collector<Result> out, TimeWindow window,
+ AbstractStreamOperator<Result> operator) throws Exception
+ {
if (previousPanes.isEmpty()) {
// optimized path for single pane case (tumbling window)
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
+ Key key = entry.getKey();
+ operator.setKeyContext(key);
function.apply(entry.getKey(), window, entry.getValue(), out);
}
}
else {
// general code path for multi-pane case
- WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, window, out);
+ WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
+ function, window, out, operator);
traverseAllPanes(evaluator, evaluationPass);
}
@@ -84,16 +90,21 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final UnionIterator<Type> unionIterator;
private final Collector<Result> out;
+
+ private final TimeWindow window;
+
+ private final AbstractStreamOperator<Result> contextOperator;
private Key currentKey;
+
- private TimeWindow window;
-
- WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
+ WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
+ Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
this.window = window;
+ this.contextOperator = contextOperator;
}
@@ -110,6 +121,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
@Override
public void keyDone() throws Exception {
+ contextOperator.setKeyContext(currentKey);
function.apply(currentKey, window, unionIterator, out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index d395b2a..8599bc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -50,7 +51,8 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
}
@Override
- public void evaluateWindow(Collector<Type> out, TimeWindow window) throws Exception {
+ public void evaluateWindow(Collector<Type> out, TimeWindow window,
+ AbstractStreamOperator<Type> operator) throws Exception {
if (previousPanes.isEmpty()) {
// optimized path for single pane case
for (KeyMap.Entry<Key, Type> entry : latestPane) {
@@ -59,7 +61,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
}
else {
// general code path for multi-pane case
- AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
+ AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator);
traverseAllPanes(evaluator, evaluationPass);
}
@@ -76,16 +78,21 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
private final Collector<Type> out;
+ private final AbstractStreamOperator<Type> operator;
+
private Type currentValue;
- AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
+ AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out,
+ AbstractStreamOperator<Type> operator) {
this.function = function;
this.out = out;
+ this.operator = operator;
}
@Override
public void startNewKey(Key key) {
currentValue = null;
+ operator.setKeyContext(key);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index ad3c838..62eb268 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -20,11 +20,15 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -48,7 +52,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -89,6 +95,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
+ public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
+ ClosureCleaner.clean(identitySelector, false);
+ ClosureCleaner.clean(validatingIdentityFunction, false);
+ }
+
+ // ------------------------------------------------------------------------
+
@After
public void checkNoTriggerThreadsRunning() {
// make sure that all the threads we trigger are shut down
@@ -544,6 +557,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// inject some elements
final int numElementsFirst = 700;
+ final int numElements = 1000;
for (int i = 0; i < numElementsFirst; i++) {
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(i));
@@ -560,6 +574,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
resultAtSnapshot = new ArrayList<>(out.getElements());
int afterSnapShot = out.getElements().size();
assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+ assertTrue(afterSnapShot <= numElementsFirst);
}
// inject some random elements, which should not show up in the state
@@ -584,7 +599,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
op.open();
// inject some more elements
- final int numElements = 1000;
for (int i = numElementsFirst; i < numElements; i++) {
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(i));
@@ -725,6 +739,64 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
}
+ @Test
+ public void testKeyValueStateInWindowFunction() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+ try {
+ final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+ StatefulFunction.globalCounts.clear();
+
+ // tumbling window that triggers every 20 milliseconds
+ AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ new AccumulatingProcessingTimeWindowOperator<>(
+ new StatefulFunction(), identitySelector,
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+
+ op.setup(mockTask, createTaskConfig(identitySelector, IntSerializer.INSTANCE), out);
+ op.open();
+
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(1));
+ op.processElement(new StreamRecord<Integer>(2));
+ }
+ out.waitForNElements(2, 60000);
+
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(1));
+ op.processElement(new StreamRecord<Integer>(2));
+ op.processElement(new StreamRecord<Integer>(1));
+ op.processElement(new StreamRecord<Integer>(1));
+ op.processElement(new StreamRecord<Integer>(2));
+ op.processElement(new StreamRecord<Integer>(2));
+ }
+ out.waitForNElements(8, 60000);
+
+ List<Integer> result = out.getElements();
+ assertEquals(8, result.size());
+
+ Collections.sort(result);
+ assertEquals(Arrays.asList(1, 1, 1, 1, 2, 2, 2, 2), result);
+
+ assertEquals(4, StatefulFunction.globalCounts.get(1).intValue());
+ assertEquals(4, StatefulFunction.globalCounts.get(2).intValue());
+
+ synchronized (lock) {
+ op.close();
+ }
+ op.dispose();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ timerService.shutdown();
+ }
+ }
+
// ------------------------------------------------------------------------
private void assertInvalidParameter(long windowSize, long windowSlide) {
@@ -771,6 +843,41 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
}
+ // ------------------------------------------------------------------------
+
+ private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+
+ // we use a concurrent map here even though there is no concurrency, to
+ // get "volatile" style access to entries
+ static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+
+ private OperatorState<Integer> state;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertNotNull(getRuntimeContext());
+ state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 0);
+ }
+
+ @Override
+ public void apply(Integer key,
+ TimeWindow window,
+ Iterable<Integer> values,
+ Collector<Integer> out) throws Exception {
+ for (Integer i : values) {
+ // we need to update this state before emitting elements. Else, the test's main
+ // thread will have received all output elements before the state is updated and
+ // the checks may fail
+ state.update(state.value() + 1);
+ globalCounts.put(key, state.value());
+
+ out.collect(i);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
private static StreamTask<?, ?> createMockTask() {
StreamTask<?, ?> task = mock(StreamTask.class);
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
@@ -821,4 +928,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
return mockTask;
}
+
+ private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ cfg.setStatePartitioner(partitioner);
+ cfg.setStateKeySerializer(keySerializer);
+ return cfg;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 4bd260f..4d507fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -21,9 +21,16 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -33,8 +40,8 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -44,15 +51,19 @@ import org.mockito.stubbing.OngoingStubbing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -70,20 +81,41 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
- private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
+ private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector =
+ new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+ @Override
+ public Integer getKey(Tuple2<Integer,Integer> value) {
+ return value.f0;
+ }
+ };
+
+ private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
- public Integer getKey(Integer value) {
- return value;
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) {
+ return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
};
- private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
+ private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer =
+ new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)
+ .createSerializer(new ExecutionConfig());
+
+ private final Comparator<Tuple2<Integer, Integer>> tupleComparator = new Comparator<Tuple2<Integer, Integer>>() {
@Override
- public Integer reduce(Integer value1, Integer value2) {
- return value1 + value2;
+ public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
+ int diff0 = o1.f0 - o2.f0;
+ int diff1 = o1.f1 - o2.f1;
+ return diff0 != 0 ? diff0 : diff1;
}
};
+
+ // ------------------------------------------------------------------------
+ public AggregatingAlignedProcessingTimeWindowOperatorTest() {
+ ClosureCleaner.clean(fieldOneSelector, false);
+ ClosureCleaner.clean(sumFunction, false);
+ }
+
// ------------------------------------------------------------------------
@After
@@ -211,12 +243,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
final Object lock = new Object();
@@ -229,7 +261,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -240,12 +274,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// get and verify the result
- List<Integer> result = out.getElements();
+ List<Tuple2<Integer, Integer>> result = out.getElements();
assertEquals(numElements, result.size());
- Collections.sort(result);
+ Collections.sort(result, tupleComparator);
for (int i = 0; i < numElements; i++) {
- assertEquals(i, result.get(i).intValue());
+ assertEquals(i, result.get(i).f0.intValue());
+ assertEquals(i, result.get(i).f1.intValue());
}
}
catch (Exception e) {
@@ -263,15 +298,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -286,8 +321,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
long nextTime = op.getNextEvaluationTime();
int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
- op.processElement(new StreamRecord<Integer>(val));
+
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(val, val));
+ op.setKeyContextElement(next);
+ op.processElement(next);
if (nextTime != previousNextTime) {
window++;
@@ -302,14 +339,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
op.dispose();
- List<Integer> result = out.getElements();
+ List<Tuple2<Integer, Integer>> result = out.getElements();
// we have ideally one element per window. we may have more, when we emitted a value into the
// successive window (corner case), so we can have twice the number of elements, in the worst case.
assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
// deduplicate for more accurate checks
- HashSet<Integer> set = new HashSet<>(result);
+ HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
assertTrue(set.size() == 10);
}
catch (Exception e) {
@@ -325,16 +362,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
public void testSlidingWindow() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
150, 50);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -344,7 +381,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -355,7 +394,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// get and verify the result
- List<Integer> result = out.getElements();
+ List<Tuple2<Integer, Integer>> result = out.getElements();
// every element can occur between one and three times
if (result.size() < numElements || result.size() > 3 * numElements) {
@@ -363,17 +402,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
fail("Wrong number of results: " + result.size());
}
- Collections.sort(result);
+ Collections.sort(result, tupleComparator);
int lastNum = -1;
int lastCount = -1;
- for (int num : result) {
- if (num == lastNum) {
+ for (Tuple2<Integer, Integer> val : result) {
+ assertEquals(val.f0, val.f1);
+
+ if (val.f0 == lastNum) {
lastCount++;
assertTrue(lastCount <= 3);
}
else {
- lastNum = num;
+ lastNum = val.f0;
lastCount = 1;
}
}
@@ -392,33 +433,45 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer, 150, 50);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
op.open();
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
+ StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
+ op.setKeyContextElement(next1);
+ op.processElement(next1);
+
+ StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
+ op.setKeyContextElement(next2);
+ op.processElement(next2);
}
// each element should end up in the output three times
// wait until the elements have arrived 6 times in the output
out.waitForNElements(6, 120000);
- List<Integer> result = out.getElements();
+ List<Tuple2<Integer, Integer>> result = out.getElements();
assertEquals(6, result.size());
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+ Collections.sort(result, tupleComparator);
+ assertEquals(Arrays.asList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(2, 2),
+ new Tuple2<>(2, 2),
+ new Tuple2<>(2, 2)
+ ), result);
synchronized (lock) {
op.close();
@@ -438,15 +491,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
public void testEmitTrailingDataOnClose() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, oneYear, oneYear);
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+ new AggregatingProcessingTimeWindowOperator<>(
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer, oneYear, oneYear);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
op.open();
@@ -454,7 +508,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
for (Integer i : data) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
}
@@ -464,9 +520,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// get and verify the result
- List<Integer> result = out.getElements();
- Collections.sort(result);
- assertEquals(data, result);
+ List<Tuple2<Integer, Integer>> result = out.getElements();
+ assertEquals(data.size(), result.size());
+
+ Collections.sort(result, tupleComparator);
+ for (int i = 0; i < data.size(); i++) {
+ assertEquals(data.get(i), result.get(i).f0);
+ assertEquals(data.get(i), result.get(i).f1);
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -481,18 +542,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
public void testPropagateExceptionsFromProcessElement() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
- ReduceFunction<Integer> failingFunction = new FailingFunction(100);
+ ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
// the operator has a window time that is so long that it will not fire in this test
final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- failingFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ failingFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
hundredYears, hundredYears);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -500,12 +561,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < 100; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
}
try {
- op.processElement(new StreamRecord<Integer>(1));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
+ op.setKeyContextElement(next);
+ op.processElement(next);
fail("This fail with an exception");
}
catch (Exception e) {
@@ -528,15 +593,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final int windowSize = 200;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 50 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -548,14 +613,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < numElementsFirst; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
// draw a snapshot and dispose the window
StreamTaskState state;
- List<Integer> resultAtSnapshot;
+ List<Tuple2<Integer, Integer>> resultAtSnapshot;
synchronized (lock) {
int beforeSnapShot = out.getElements().size();
state = op.snapshotOperatorState(1L, System.currentTimeMillis());
@@ -569,7 +636,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// inject some random elements, which should not show up in the state
for (int i = numElementsFirst; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -577,10 +646,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
+ final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSize);
op = new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
op.setup(mockTask, new StreamConfig(new Configuration()), out2);
@@ -590,7 +659,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// inject the remaining elements
for (int i = numElementsFirst; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -601,13 +672,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+ List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
finalResult.addAll(out2.getElements());
assertEquals(numElements, finalResult.size());
- Collections.sort(finalResult);
+ Collections.sort(finalResult, tupleComparator);
for (int i = 0; i < numElements; i++) {
- assertEquals(i, finalResult.get(i).intValue());
+ assertEquals(i, finalResult.get(i).f0.intValue());
+ assertEquals(i, finalResult.get(i).f1.intValue());
}
}
catch (Exception e) {
@@ -627,15 +699,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSlide);
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// sliding window (200 msecs) every 50 msecs
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -647,14 +719,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < numElementsFirst; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
// draw a snapshot
StreamTaskState state;
- List<Integer> resultAtSnapshot;
+ List<Tuple2<Integer, Integer>> resultAtSnapshot;
synchronized (lock) {
int beforeSnapShot = out.getElements().size();
state = op.snapshotOperatorState(1L, System.currentTimeMillis());
@@ -668,7 +742,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// inject the remaining elements - these should not influence the snapshot
for (int i = numElementsFirst; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -676,10 +752,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
+ final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSlide);
op = new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+ sumFunction, fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
op.setup(mockTask, new StreamConfig(new Configuration()), out2);
@@ -690,7 +766,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// inject again the remaining elements
for (int i = numElementsFirst; i < numElements; i++) {
synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
+ StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+ op.setKeyContextElement(next);
+ op.processElement(next);
}
Thread.sleep(1);
}
@@ -710,13 +788,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
op.dispose();
// get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+ List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
finalResult.addAll(out2.getElements());
assertEquals(factor * numElements, finalResult.size());
- Collections.sort(finalResult);
+ Collections.sort(finalResult, tupleComparator);
for (int i = 0; i < factor * numElements; i++) {
- assertEquals(i / factor, finalResult.get(i).intValue());
+ assertEquals(i / factor, finalResult.get(i).f0.intValue());
+ assertEquals(i / factor, finalResult.get(i).f1.intValue());
}
}
catch (Exception e) {
@@ -727,6 +806,134 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
timerService.shutdown();
}
}
+
+ @Test
+ public void testKeyValueStateInWindowFunctionTumbling() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+ try {
+ final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
+
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+ StatefulFunction.globalCounts.clear();
+
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+ new AggregatingProcessingTimeWindowOperator<>(
+ new StatefulFunction(), fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears);
+
+ op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out);
+ op.open();
+
+ // because the window interval is so large, everything should be in one window
+ // and aggregate into one value per key
+
+ synchronized (lock) {
+ for (int i = 0; i < 10; i++) {
+ StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
+ op.setKeyContextElement(next1);
+ op.processElement(next1);
+
+ StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
+ op.setKeyContextElement(next2);
+ op.processElement(next2);
+ }
+
+ op.close();
+ }
+
+ List<Tuple2<Integer, Integer>> result = out.getElements();
+ assertEquals(2, result.size());
+
+ Collections.sort(result, tupleComparator);
+ assertEquals(45, result.get(0).f1.intValue());
+ assertEquals(45, result.get(1).f1.intValue());
+
+ assertEquals(10, StatefulFunction.globalCounts.get(1).intValue());
+ assertEquals(10, StatefulFunction.globalCounts.get(2).intValue());
+
+ op.dispose();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ timerService.shutdown();
+ }
+ }
+
+ @Test
+ public void testKeyValueStateInWindowFunctionSliding() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+ try {
+ final int factor = 2;
+ final int windowSlide = 50;
+ final int windowSize = factor * windowSlide;
+
+ final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+ StatefulFunction.globalCounts.clear();
+
+ AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+ new AggregatingProcessingTimeWindowOperator<>(
+ new StatefulFunction(), fieldOneSelector,
+ IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide);
+
+ op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out);
+ op.open();
+
+ // because the window interval is so large, everything should be in one window
+ // and aggregate into one value per key
+ final int numElements = 100;
+
+ // because we do not release the lock here, these elements
+ for (int i = 0; i < numElements; i++) {
+
+ StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
+ StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
+ StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i));
+ StreamRecord<Tuple2<Integer, Integer>> next4 = new StreamRecord<>(new Tuple2<>(2, i));
+
+ // because we do not release the lock between elements, they end up in the same windows
+ synchronized (lock) {
+ op.setKeyContextElement(next1);
+ op.processElement(next1);
+ op.setKeyContextElement(next2);
+ op.processElement(next2);
+ op.setKeyContextElement(next3);
+ op.processElement(next3);
+ op.setKeyContextElement(next4);
+ op.processElement(next4);
+ }
+
+ Thread.sleep(1);
+ }
+
+ synchronized (lock) {
+ op.close();
+ }
+
+ int count1 = StatefulFunction.globalCounts.get(1);
+ int count2 = StatefulFunction.globalCounts.get(2);
+
+ assertTrue(count1 >= 2 && count1 <= 2 * numElements);
+ assertEquals(count1, count2);
+
+ op.dispose();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ timerService.shutdown();
+ }
+ }
// ------------------------------------------------------------------------
@@ -748,7 +955,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class FailingFunction implements ReduceFunction<Integer> {
+ private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
private final int failAfterElements;
@@ -759,16 +966,44 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
@Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
numElements++;
if (numElements >= failAfterElements) {
throw new Exception("Artificial Test Exception");
}
- return value1 + value2;
+ return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}
+
+ // ------------------------------------------------------------------------
+
+ private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
+
+ static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+
+ private OperatorState<Integer> state;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertNotNull(getRuntimeContext());
+
+ // start with one, so the final count is correct and we test that we do not
+ // initialize with 0 always by default
+ state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 1);
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+ state.update(state.value() + 1);
+ globalCounts.put(value1.f0, state.value());
+
+ return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+ }
+ }
+
+ // ------------------------------------------------------------------------
private static StreamTask<?, ?> createMockTask() {
StreamTask<?, ?> task = mock(StreamTask.class);
@@ -820,4 +1055,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return mockTask;
}
+
+ private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ cfg.setStatePartitioner(partitioner);
+ cfg.setStateKeySerializer(keySerializer);
+ return cfg;
+ }
}
[3/3] flink git commit: [FLINK-2866] [runtime] Eagerly close
FSDataInputStream in file state handle
Posted by se...@apache.org.
[FLINK-2866] [runtime] Eagerly close FSDataInputStream in file state handle
This closes #1282
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2811ce9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2811ce9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2811ce9
Branch: refs/heads/release-0.10
Commit: c2811ce975548e18d64a5f3c2b1b397f9e42bc1c
Parents: ec1730b
Author: tedyu <yu...@gmail.com>
Authored: Wed Oct 21 19:40:21 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100
----------------------------------------------------------------------
.../runtime/state/filesystem/FileSerializableStateHandle.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2811ce9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
index b7e7cd1..63336d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -46,8 +46,9 @@ public class FileSerializableStateHandle<T> extends AbstractFileState implements
@Override
@SuppressWarnings("unchecked")
public T getState(ClassLoader classLoader) throws Exception {
- FSDataInputStream inStream = getFileSystem().open(getFilePath());
- ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
- return (T) ois.readObject();
+ try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
+ ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+ return (T) ois.readObject();
+ }
}
}
[2/3] flink git commit: [FLINK-2888] [streaming] State backends
return copies of the default values
Posted by se...@apache.org.
[FLINK-2888] [streaming] State backends return copies of the default values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4cb0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4cb0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4cb0ae
Branch: refs/heads/release-0.10
Commit: 8e4cb0ae0bdc7f8c60542edabad57e4fa2f0c61e
Parents: c2811ce
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 11:24:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100
----------------------------------------------------------------------
.../runtime/state/AbstractHeapKvState.java | 3 +-
.../runtime/state/FileStateBackendTest.java | 40 ++++++++++++++++----
.../runtime/state/MemoryStateBackendTest.java | 31 ++++++++++++---
3 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
index 12250b9..23703b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Bac
@Override
public V value() {
V value = state.get(currentKey);
- return value != null ? value : defaultValue;
+ return value != null ? value :
+ (defaultValue == null ? null : valueSerializer.copy(defaultValue));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 481fb98..7182a36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
import org.junit.Test;
@@ -382,6 +378,36 @@ public class FileStateBackendTest {
deleteDirectorySilently(tempDir);
}
}
+
+ @Test
+ public void testCopyDefaultValue() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ KvState<Integer, IntValue, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+ kv.setCurrentKey(1);
+ IntValue default1 = kv.value();
+
+ kv.setCurrentKey(2);
+ IntValue default2 = kv.value();
+
+ assertNotNull(default1);
+ assertNotNull(default2);
+ assertEquals(default1, default2);
+ assertFalse(default1 == default2);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
// ------------------------------------------------------------------------
// Utilities
@@ -411,7 +437,7 @@ public class FileStateBackendTest {
}
private static String localFileUri(File path) {
- return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+ return path.toURI().toString();
}
private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 5f95b33..87a050b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -21,14 +21,11 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
import org.junit.Test;
@@ -279,4 +276,28 @@ public class MemoryStateBackendTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testCopyDefaultValue() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+ KvState<Integer, IntValue, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+ kv.setCurrentKey(1);
+ IntValue default1 = kv.value();
+
+ kv.setCurrentKey(2);
+ IntValue default2 = kv.value();
+
+ assertNotNull(default1);
+ assertNotNull(default2);
+ assertEquals(default1, default2);
+ assertFalse(default1 == default2);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}