You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 06:15:02 UTC

[flink] 02/04: [FLINK-17861][task][checkpointing] Merge channel state serializer and deserializer Motivation: 1. add a method that deserializes and then serializes data 2. simplify

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 53821fce63aa8086efb61ea76f12e756856dca4d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 21 20:38:07 2020 +0200

    [FLINK-17861][task][checkpointing] Merge channel state serializer and deserializer
    Motivation:
    1. add a method that deserializes and then serializes data
    2. simplify
---
 .../checkpoint/channel/ChannelStateReaderImpl.java |  2 +-
 .../checkpoint/channel/ChannelStateSerializer.java |  5 +---
 .../channel/ChannelStateStreamReader.java          |  4 +--
 .../channel/RefCountingFSDataInputStream.java      | 10 ++++----
 .../channel/ChannelStateReaderImplTest.java        | 29 ++++++++--------------
 .../channel/ChannelStateSerializerImplTest.java    |  2 +-
 6 files changed, 20 insertions(+), 32 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
index 8c097ea..dbc5bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
@@ -65,7 +65,7 @@ public class ChannelStateReaderImpl implements ChannelStateReader {
 		this(snapshot, new ChannelStateSerializerImpl());
 	}
 
-	ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateDeserializer serializer) {
+	ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) {
 		RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStreamFactory(serializer);
 		final HashMap<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReadersTmp = new HashMap<>();
 		final HashMap<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReadersTmp = new HashMap<>();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
index 84e13d1..2d1184e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
@@ -39,9 +39,6 @@ interface ChannelStateSerializer {
 	void writeHeader(DataOutputStream dataStream) throws IOException;
 
 	void writeData(DataOutputStream stream, Buffer... flinkBuffers) throws IOException;
-}
-
-interface ChannelStateDeserializer {
 
 	void readHeader(InputStream stream) throws IOException;
 
@@ -128,7 +125,7 @@ interface ChannelStateByteBuffer {
 	}
 }
 
-class ChannelStateSerializerImpl implements ChannelStateSerializer, ChannelStateDeserializer {
+class ChannelStateSerializerImpl implements ChannelStateSerializer {
 	private static final int SERIALIZATION_VERSION = 0;
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
index 6747750..edc2406 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
@@ -45,7 +45,7 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.Rea
 class ChannelStateStreamReader implements Closeable {
 
 	private final RefCountingFSDataInputStream stream;
-	private final ChannelStateDeserializer serializer;
+	private final ChannelStateSerializer serializer;
 	private final Queue<Long> offsets;
 	private int remainingBytes = -1;
 	private boolean closed = false;
@@ -54,7 +54,7 @@ class ChannelStateStreamReader implements Closeable {
 		this(streamFactory.getOrCreate(handle), handle.getOffsets(), streamFactory.getSerializer());
 	}
 
-	private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateDeserializer serializer) {
+	private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateSerializer serializer) {
 		this.stream = stream;
 		this.stream.incRef();
 		this.serializer = serializer;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
index 86181a2..5dd16b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
@@ -39,13 +39,13 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
 
 	private final SupplierWithException<FSDataInputStream, IOException> streamSupplier;
 	private FSDataInputStream stream;
-	private final ChannelStateDeserializer serializer;
+	private final ChannelStateSerializer serializer;
 	private int refCount = 0;
 	private State state = State.NEW;
 
 	private RefCountingFSDataInputStream(
 			SupplierWithException<FSDataInputStream, IOException> streamSupplier,
-			ChannelStateDeserializer serializer) {
+			ChannelStateSerializer serializer) {
 		this.streamSupplier = checkNotNull(streamSupplier);
 		this.serializer = checkNotNull(serializer);
 	}
@@ -105,9 +105,9 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
 	@NotThreadSafe
 	static class RefCountingFSDataInputStreamFactory {
 		private final Map<StreamStateHandle, RefCountingFSDataInputStream> streams = new HashMap<>(); // not clearing: expecting short life
-		private final ChannelStateDeserializer serializer;
+		private final ChannelStateSerializer serializer;
 
-		RefCountingFSDataInputStreamFactory(ChannelStateDeserializer serializer) {
+		RefCountingFSDataInputStreamFactory(ChannelStateSerializer serializer) {
 			this.serializer = checkNotNull(serializer);
 		}
 
@@ -121,7 +121,7 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
 			return stream;
 		}
 
-		ChannelStateDeserializer getSerializer() {
+		ChannelStateSerializer getSerializer() {
 			return serializer;
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
index fcf90e8..fbe1ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
@@ -128,28 +128,19 @@ public class ChannelStateReaderImplTest {
 		return buf;
 	}
 
-	private ChannelStateDeserializer deserializer(byte[] data) {
-		return new ChannelStateDeserializer() {
-			@Override
-			public void readHeader(InputStream stream) {
-			}
-
-			@Override
-			public int readLength(InputStream stream) {
-				return data.length;
-			}
-
-			@Override
-			public int readData(InputStream stream, ChannelStateByteBuffer buffer, int bytes) throws IOException {
-				return buffer.writeBytes(stream, bytes);
-			}
-		};
-	}
-
 	private ChannelStateReaderImpl getReader(InputChannelInfo channel, byte[] data) {
 		return new ChannelStateReaderImpl(
 			taskStateSnapshot(singletonList(new InputChannelStateHandle(channel, new ByteStreamStateHandle("", data), singletonList(0L)))),
-			deserializer(DATA));
+			new ChannelStateSerializerImpl() {
+				@Override
+				public void readHeader(InputStream stream) {
+				}
+
+				@Override
+				public int readLength(InputStream stream) {
+					return data.length;
+				}
+			});
 	}
 
 	private void readAndVerify(int bufferSize, InputChannelInfo channelInfo, byte[] data, ChannelStateReader reader) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
index e3a79cb..79a3b04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
@@ -74,7 +74,7 @@ public class ChannelStateSerializerImplTest {
 		}
 		out.close();
 
-		ChannelStateDeserializer d = new ChannelStateSerializerImpl();
+		ChannelStateSerializer d = new ChannelStateSerializerImpl();
 		ByteArrayInputStream is = new ByteArrayInputStream(baos.toByteArray());
 		d.readHeader(is);
 		for (int count : numBuffersToWriteAtOnce) {