You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2021/12/28 14:04:07 UTC

[flink] branch release-1.14 updated: [FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 24016b8  [FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])
24016b8 is described below

commit 24016b83eec53ba7124d651201daa580323b80fc
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Dec 27 11:50:04 2021 +0800

    [FLINK-25446][state] Avoid improper sanity check on read bytes on DataInputStream#read(byte[])
---
 .../main/java/org/apache/flink/changelog/fs/StateChangeFormat.java   | 4 ++--
 .../runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java      | 5 ++---
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 246932e..ba04f59 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,6 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import static java.util.Comparator.comparing;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /** Serialization format for state changes. */
 @Internal
@@ -135,7 +135,7 @@ public class StateChangeFormat
             private StateChange readChange() throws IOException {
                 int size = input.readInt();
                 byte[] bytes = new byte[size];
-                checkState(size == input.read(bytes));
+                IOUtils.readFully(input, bytes, 0, size);
                 return new StateChange(keyGroup, bytes);
             }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index b772232..7062598 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAcce
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
 
@@ -66,8 +67,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
  *
@@ -457,7 +456,7 @@ public abstract class MetadataV2V3SerializerBase {
                 int keyGroup = dis.readInt();
                 int bytesSize = dis.readInt();
                 byte[] bytes = new byte[bytesSize];
-                checkState(bytesSize == dis.read(bytes));
+                IOUtils.readFully(dis, bytes, 0, bytesSize);
                 changes.add(new StateChange(keyGroup, bytes));
             }
             return new InMemoryChangelogStateHandle(changes, from, to, keyGroupRange);