You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/27 08:38:40 UTC

[flink] branch release-1.5 updated: [FLINK-10204] Fix serialization/copy of LatencyMarkers

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 9ad1bff  [FLINK-10204] Fix serialization/copy of LatencyMarkers
9ad1bff is described below

commit 9ad1bff35411bc18d0c43a7e0322fc59852f9729
Author: Ben La Monica <be...@gmail.com>
AuthorDate: Mon Aug 27 03:20:24 2018 -0500

    [FLINK-10204] Fix serialization/copy of LatencyMarkers
---
 .../apache/flink/streaming/runtime/streamrecord/LatencyMarker.java    | 2 +-
 .../flink/streaming/runtime/streamrecord/StreamElementSerializer.java | 3 ++-
 .../streaming/runtime/streamrecord/StreamElementSerializerTest.java   | 4 ++++
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
index 932e130..4074251 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -80,7 +80,7 @@ public final class LatencyMarker extends StreamElement {
 		if (markedTime != that.markedTime) {
 			return false;
 		}
-		if (operatorId != that.operatorId) {
+		if (!operatorId.equals(that.operatorId)) {
 			return false;
 		}
 		return subtaskIndex == that.subtaskIndex;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ba92416..ed6022f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -156,7 +156,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 		}
 		else if (tag == TAG_LATENCY_MARKER) {
 			target.writeLong(source.readLong());
-			target.writeInt(source.readInt());
+			target.writeLong(source.readLong());
+			target.writeLong(source.readLong());
 			target.writeInt(source.readInt());
 		} else {
 			throw new IOException("Corrupt stream, found tag: " + tag);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index 4a99317..0e4e84b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
 import org.junit.Test;
@@ -89,6 +90,9 @@ public class StreamElementSerializerTest {
 
 		Watermark negativeWatermark = new Watermark(-4647654567676555876L);
 		assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer));
+
+		LatencyMarker latencyMarker = new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
+		assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer));
 	}
 
 	@SuppressWarnings("unchecked")