You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/30 14:39:46 UTC

flink git commit: [FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors

Repository: flink
Updated Branches:
  refs/heads/master df4216083 -> fef9f1158


[FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fef9f115
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fef9f115
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fef9f115

Branch: refs/heads/master
Commit: fef9f115838b3ba3d3769f8669ee251c2cd403c6
Parents: df42160
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Jun 30 13:48:17 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Jun 30 14:29:11 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OutputHandler.java  |  1 +
 .../api/state/StatefulOperatorTest.java         | 35 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 2d2f29b..73f0a89 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -274,6 +274,7 @@ public class OutputHandler<OUT> {
 		@Override
 		public void collect(T record) {
 			try {
+				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(serializer.copy(record));
 			} catch (Exception e) {
 				if (LOG.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index af719f3..774b431 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -109,6 +109,11 @@ public class StatefulOperatorTest {
 			public void invoke(String value) throws Exception {}
 		});
 		
+		keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
+			private static final long serialVersionUID = 1L;
+			public void invoke(String value) throws Exception {}
+		});
+		
 		try {
 			keyedStream.shuffle();
 			fail();
@@ -224,6 +229,36 @@ public class StatefulOperatorTest {
 		}
 	}
 	
+	public static class StatefulMapper2 extends RichMapFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+		OperatorState<Integer> groupCounter;
+		
+		@Override
+		public String map(Integer value) throws Exception {
+			groupCounter.updateState(groupCounter.getState() + 1);
+			
+			return value.toString();
+		}
+
+		@Override
+		public void open(Configuration conf) throws IOException {		
+			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
+		}
+		
+		@SuppressWarnings({ "rawtypes", "unchecked" })
+		@Override
+		public void close() throws Exception {
+			Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
+			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
+				Integer key = (Integer) count.getKey();
+				Integer expected = key < 3 ? 2 : 1;
+				assertEquals(expected, count.getValue());
+			}
+		}
+		
+	}
+	
 	public static class ModKey implements KeySelector<Integer, Serializable> {
 
 		private static final long serialVersionUID = 4193026742083046736L;