You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/30 14:39:46 UTC
flink git commit: [FLINK-2294] [streaming] Fix partitioned state
next-input setting for copying chained collectors
Repository: flink
Updated Branches:
refs/heads/master df4216083 -> fef9f1158
[FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fef9f115
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fef9f115
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fef9f115
Branch: refs/heads/master
Commit: fef9f115838b3ba3d3769f8669ee251c2cd403c6
Parents: df42160
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Jun 30 13:48:17 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Jun 30 14:29:11 2015 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/OutputHandler.java | 1 +
.../api/state/StatefulOperatorTest.java | 35 ++++++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 2d2f29b..73f0a89 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -274,6 +274,7 @@ public class OutputHandler<OUT> {
@Override
public void collect(T record) {
try {
+ operator.getRuntimeContext().setNextInput(record);
operator.processElement(serializer.copy(record));
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index af719f3..774b431 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -109,6 +109,11 @@ public class StatefulOperatorTest {
public void invoke(String value) throws Exception {}
});
+ keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
+ private static final long serialVersionUID = 1L;
+ public void invoke(String value) throws Exception {}
+ });
+
try {
keyedStream.shuffle();
fail();
@@ -224,6 +229,36 @@ public class StatefulOperatorTest {
}
}
+ public static class StatefulMapper2 extends RichMapFunction<Integer, String> {
+ private static final long serialVersionUID = 1L;
+ OperatorState<Integer> groupCounter;
+
+ @Override
+ public String map(Integer value) throws Exception {
+ groupCounter.updateState(groupCounter.getState() + 1);
+
+ return value.toString();
+ }
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void close() throws Exception {
+ Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+ PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
+ for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
+ Integer key = (Integer) count.getKey();
+ Integer expected = key < 3 ? 2 : 1;
+ assertEquals(expected, count.getValue());
+ }
+ }
+
+ }
+
public static class ModKey implements KeySelector<Integer, Serializable> {
private static final long serialVersionUID = 4193026742083046736L;