You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/08/08 11:21:42 UTC
flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException
when object-reuse is enabled
Repository: flink
Updated Branches:
refs/heads/master 4dfefd042 -> 6f5fa7f74
[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled
This closes #4496.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f5fa7f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f5fa7f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f5fa7f7
Branch: refs/heads/master
Commit: 6f5fa7f741538207244368c275bee9958c43a25a
Parents: 4dfefd0
Author: Xpray <le...@gmail.com>
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Aug 8 19:20:32 2017 +0800
----------------------------------------------------------------------
.../streaming/runtime/tasks/OperatorChain.java | 12 ++++++++----
.../streaming/api/StreamingOperatorsITCase.java | 17 +++++++++++++++++
2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0f29b73..b15f126 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
output.collect(shallowCopy);
}
- // don't copy for the last output
- outputs[outputs.length - 1].collect(record);
+ if (outputs.length > 0) {
+ // don't copy for the last output
+ outputs[outputs.length - 1].collect(record);
+ }
}
@Override
@@ -625,8 +627,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
output.collect(outputTag, shallowCopy);
}
- // don't copy for the last output
- outputs[outputs.length - 1].collect(outputTag, record);
+ if (outputs.length > 0) {
+ // don't copy for the last output
+ outputs[outputs.length - 1].collect(outputTag, record);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 6d2f8c5..32a04fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.streaming.api;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.junit.Assert;
@@ -378,4 +381,18 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
collections.clear();
}
}
+
+ @Test
+ public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ DataStream<Integer> input = env.fromElements(1, 2, 3);
+ input.flatMap(new FlatMapFunction<Integer, Integer>() {
+ @Override
+ public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+ out.collect(value << 1);
+ }
+ });
+ env.execute();
+ }
}