You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 15:45:09 UTC
[2/3] flink git commit: [hotfix] Fix broken copy in OperatorChain
[hotfix] Fix broken copy in OperatorChain
Before, the StreamRecords was not copied, now it is.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256a88a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256a88a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256a88a1
Branch: refs/heads/master
Commit: 256a88a186dd7fe2a958d4bbc826fee3b806efc6
Parents: d056f11
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 14:38:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:37:55 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/runtime/tasks/OperatorChain.java | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/256a88a1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b42b888..ac27093 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -301,22 +301,19 @@ public class OperatorChain<OUT> {
private final TypeSerializer<T> serializer;
- private final StreamRecord<T> copyRecord;
-
public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
super(operator);
this.serializer = serializer;
- this.copyRecord = new StreamRecord<T>(null, 0L);
}
@Override
public void collect(StreamRecord<T> record) {
try {
- T copy = serializer.copy(record.getValue());
- copyRecord.replace(copy, record.getTimestamp());
-
- operator.setKeyContextElement(copyRecord);
- operator.processElement(copyRecord);
+
+ StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
+
+ operator.setKeyContextElement(copy);
+ operator.processElement(copy);
}
catch (Exception e) {
throw new RuntimeException("Could not forward element to next operator", e);