You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2021/12/28 14:04:07 UTC
[flink] branch release-1.14 updated: [FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 24016b8 [FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])
24016b8 is described below
commit 24016b83eec53ba7124d651201daa580323b80fc
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Dec 27 11:50:04 2021 +0800
[FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])
---
.../main/java/org/apache/flink/changelog/fs/StateChangeFormat.java | 4 ++--
.../runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java | 5 ++---
2 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 246932e..ba04f59 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,6 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
import static java.util.Comparator.comparing;
-import static org.apache.flink.util.Preconditions.checkState;
/** Serialization format for state changes. */
@Internal
@@ -135,7 +135,7 @@ public class StateChangeFormat
private StateChange readChange() throws IOException {
int size = input.readInt();
byte[] bytes = new byte[size];
- checkState(size == input.read(bytes));
+ IOUtils.readFully(input, bytes, 0, size);
return new StateChange(keyGroup, bytes);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index b772232..7062598 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAcce
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
@@ -66,8 +67,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* Base (De)serializer for checkpoint metadata format version 2 and 3.
*
@@ -457,7 +456,7 @@ public abstract class MetadataV2V3SerializerBase {
int keyGroup = dis.readInt();
int bytesSize = dis.readInt();
byte[] bytes = new byte[bytesSize];
- checkState(bytesSize == dis.read(bytes));
+ IOUtils.readFully(dis, bytes, 0, bytesSize);
changes.add(new StateChange(keyGroup, bytes));
}
return new InMemoryChangelogStateHandle(changes, from, to, keyGroupRange);