You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/08/08 11:21:42 UTC

flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

Repository: flink
Updated Branches:
  refs/heads/master 4dfefd042 -> 6f5fa7f74


[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

This closes #4496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f5fa7f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f5fa7f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f5fa7f7

Branch: refs/heads/master
Commit: 6f5fa7f741538207244368c275bee9958c43a25a
Parents: 4dfefd0
Author: Xpray <le...@gmail.com>
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Aug 8 19:20:32 2017 +0800

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java     | 12 ++++++++----
 .../streaming/api/StreamingOperatorsITCase.java    | 17 +++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0f29b73..b15f126 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				output.collect(shallowCopy);
 			}
 
-			// don't copy for the last output
-			outputs[outputs.length - 1].collect(record);
+			if (outputs.length > 0) {
+				// don't copy for the last output
+				outputs[outputs.length - 1].collect(record);
+			}
 		}
 
 		@Override
@@ -625,8 +627,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				output.collect(outputTag, shallowCopy);
 			}
 
-			// don't copy for the last output
-			outputs[outputs.length - 1].collect(outputTag, record);
+			if (outputs.length > 0) {
+				// don't copy for the last output
+				outputs[outputs.length - 1].collect(outputTag, record);
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 6d2f8c5..32a04fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.api;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 
 import org.junit.Assert;
@@ -378,4 +381,18 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			collections.clear();
 		}
 	}
+
+	@Test
+	public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		DataStream<Integer> input = env.fromElements(1, 2, 3);
+		input.flatMap(new FlatMapFunction<Integer, Integer>() {
+			@Override
+			public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+				out.collect(value << 1);
+			}
+		});
+		env.execute();
+	}
 }