You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 06:15:02 UTC
[flink] 02/04: [FLINK-17861][task][checkpointing] Merge channel
state serializer and deserializer Motivation: 1. add a method that
deserializes and then serializes data 2. simplify
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 53821fce63aa8086efb61ea76f12e756856dca4d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 21 20:38:07 2020 +0200
[FLINK-17861][task][checkpointing] Merge channel state serializer and deserializer
Motivation:
1. add a method that deserializes and then serializes data
2. simplify
---
.../checkpoint/channel/ChannelStateReaderImpl.java | 2 +-
.../checkpoint/channel/ChannelStateSerializer.java | 5 +---
.../channel/ChannelStateStreamReader.java | 4 +--
.../channel/RefCountingFSDataInputStream.java | 10 ++++----
.../channel/ChannelStateReaderImplTest.java | 29 ++++++++--------------
.../channel/ChannelStateSerializerImplTest.java | 2 +-
6 files changed, 20 insertions(+), 32 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
index 8c097ea..dbc5bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.java
@@ -65,7 +65,7 @@ public class ChannelStateReaderImpl implements ChannelStateReader {
this(snapshot, new ChannelStateSerializerImpl());
}
- ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateDeserializer serializer) {
+ ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) {
RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStreamFactory(serializer);
final HashMap<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReadersTmp = new HashMap<>();
final HashMap<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReadersTmp = new HashMap<>();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
index 84e13d1..2d1184e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java
@@ -39,9 +39,6 @@ interface ChannelStateSerializer {
void writeHeader(DataOutputStream dataStream) throws IOException;
void writeData(DataOutputStream stream, Buffer... flinkBuffers) throws IOException;
-}
-
-interface ChannelStateDeserializer {
void readHeader(InputStream stream) throws IOException;
@@ -128,7 +125,7 @@ interface ChannelStateByteBuffer {
}
}
-class ChannelStateSerializerImpl implements ChannelStateSerializer, ChannelStateDeserializer {
+class ChannelStateSerializerImpl implements ChannelStateSerializer {
private static final int SERIALIZATION_VERSION = 0;
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
index 6747750..edc2406 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateStreamReader.java
@@ -45,7 +45,7 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateReader.Rea
class ChannelStateStreamReader implements Closeable {
private final RefCountingFSDataInputStream stream;
- private final ChannelStateDeserializer serializer;
+ private final ChannelStateSerializer serializer;
private final Queue<Long> offsets;
private int remainingBytes = -1;
private boolean closed = false;
@@ -54,7 +54,7 @@ class ChannelStateStreamReader implements Closeable {
this(streamFactory.getOrCreate(handle), handle.getOffsets(), streamFactory.getSerializer());
}
- private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateDeserializer serializer) {
+ private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateSerializer serializer) {
this.stream = stream;
this.stream.incRef();
this.serializer = serializer;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
index 86181a2..5dd16b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RefCountingFSDataInputStream.java
@@ -39,13 +39,13 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
private final SupplierWithException<FSDataInputStream, IOException> streamSupplier;
private FSDataInputStream stream;
- private final ChannelStateDeserializer serializer;
+ private final ChannelStateSerializer serializer;
private int refCount = 0;
private State state = State.NEW;
private RefCountingFSDataInputStream(
SupplierWithException<FSDataInputStream, IOException> streamSupplier,
- ChannelStateDeserializer serializer) {
+ ChannelStateSerializer serializer) {
this.streamSupplier = checkNotNull(streamSupplier);
this.serializer = checkNotNull(serializer);
}
@@ -105,9 +105,9 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
@NotThreadSafe
static class RefCountingFSDataInputStreamFactory {
private final Map<StreamStateHandle, RefCountingFSDataInputStream> streams = new HashMap<>(); // not clearing: expecting short life
- private final ChannelStateDeserializer serializer;
+ private final ChannelStateSerializer serializer;
- RefCountingFSDataInputStreamFactory(ChannelStateDeserializer serializer) {
+ RefCountingFSDataInputStreamFactory(ChannelStateSerializer serializer) {
this.serializer = checkNotNull(serializer);
}
@@ -121,7 +121,7 @@ class RefCountingFSDataInputStream extends FSDataInputStream {
return stream;
}
- ChannelStateDeserializer getSerializer() {
+ ChannelStateSerializer getSerializer() {
return serializer;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
index fcf90e8..fbe1ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImplTest.java
@@ -128,28 +128,19 @@ public class ChannelStateReaderImplTest {
return buf;
}
- private ChannelStateDeserializer deserializer(byte[] data) {
- return new ChannelStateDeserializer() {
- @Override
- public void readHeader(InputStream stream) {
- }
-
- @Override
- public int readLength(InputStream stream) {
- return data.length;
- }
-
- @Override
- public int readData(InputStream stream, ChannelStateByteBuffer buffer, int bytes) throws IOException {
- return buffer.writeBytes(stream, bytes);
- }
- };
- }
-
private ChannelStateReaderImpl getReader(InputChannelInfo channel, byte[] data) {
return new ChannelStateReaderImpl(
taskStateSnapshot(singletonList(new InputChannelStateHandle(channel, new ByteStreamStateHandle("", data), singletonList(0L)))),
- deserializer(DATA));
+ new ChannelStateSerializerImpl() {
+ @Override
+ public void readHeader(InputStream stream) {
+ }
+
+ @Override
+ public int readLength(InputStream stream) {
+ return data.length;
+ }
+ });
}
private void readAndVerify(int bufferSize, InputChannelInfo channelInfo, byte[] data, ChannelStateReader reader) throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
index e3a79cb..79a3b04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
@@ -74,7 +74,7 @@ public class ChannelStateSerializerImplTest {
}
out.close();
- ChannelStateDeserializer d = new ChannelStateSerializerImpl();
+ ChannelStateSerializer d = new ChannelStateSerializerImpl();
ByteArrayInputStream is = new ByteArrayInputStream(baos.toByteArray());
d.readHeader(is);
for (int count : numBuffersToWriteAtOnce) {