You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 15:45:09 UTC

[2/3] flink git commit: [hotfix] Fix broken copy in OperatorChain

[hotfix] Fix broken copy in OperatorChain

Before, the StreamRecords was not copied, now it is.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256a88a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256a88a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256a88a1

Branch: refs/heads/master
Commit: 256a88a186dd7fe2a958d4bbc826fee3b806efc6
Parents: d056f11
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 14:38:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:37:55 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OperatorChain.java   | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/256a88a1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b42b888..ac27093 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -301,22 +301,19 @@ public class OperatorChain<OUT> {
 		
 		private final TypeSerializer<T> serializer;
 		
-		private final StreamRecord<T> copyRecord;
-
 		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
 			super(operator);
 			this.serializer = serializer;
-			this.copyRecord = new StreamRecord<T>(null, 0L);
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
-				T copy = serializer.copy(record.getValue());
-				copyRecord.replace(copy, record.getTimestamp());
-				
-				operator.setKeyContextElement(copyRecord);
-				operator.processElement(copyRecord);
+
+				StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
+
+				operator.setKeyContextElement(copy);
+				operator.processElement(copy);
 			}
 			catch (Exception e) {
 				throw new RuntimeException("Could not forward element to next operator", e);