You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/27 08:38:40 UTC
[flink] branch release-1.5 updated: [FLINK-10204] Fix
serialization/copy of LatencyMarkers
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new 9ad1bff [FLINK-10204] Fix serialization/copy of LatencyMarkers
9ad1bff is described below
commit 9ad1bff35411bc18d0c43a7e0322fc59852f9729
Author: Ben La Monica <be...@gmail.com>
AuthorDate: Mon Aug 27 03:20:24 2018 -0500
[FLINK-10204] Fix serialization/copy of LatencyMarkers
---
.../apache/flink/streaming/runtime/streamrecord/LatencyMarker.java | 2 +-
.../flink/streaming/runtime/streamrecord/StreamElementSerializer.java | 3 ++-
.../streaming/runtime/streamrecord/StreamElementSerializerTest.java | 4 ++++
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
index 932e130..4074251 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -80,7 +80,7 @@ public final class LatencyMarker extends StreamElement {
if (markedTime != that.markedTime) {
return false;
}
- if (operatorId != that.operatorId) {
+ if (!operatorId.equals(that.operatorId)) {
return false;
}
return subtaskIndex == that.subtaskIndex;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ba92416..ed6022f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -156,7 +156,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
}
else if (tag == TAG_LATENCY_MARKER) {
target.writeLong(source.readLong());
- target.writeInt(source.readInt());
+ target.writeLong(source.readLong());
+ target.writeLong(source.readLong());
target.writeInt(source.readInt());
} else {
throw new IOException("Corrupt stream, found tag: " + tag);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index 4a99317..0e4e84b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Test;
@@ -89,6 +90,9 @@ public class StreamElementSerializerTest {
Watermark negativeWatermark = new Watermark(-4647654567676555876L);
assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer));
+
+ LatencyMarker latencyMarker = new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
+ assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer));
}
@SuppressWarnings("unchecked")