You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/18 18:00:37 UTC

[flink] 02/02: [FLINK-20580][core] Does not accept null value for SerializedValue

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed981be6601d3600d23b301d70d2bffff8aad3e6
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Feb 13 00:13:54 2021 +0800

    [FLINK-20580][core] Does not accept null value for SerializedValue
    
    This closes #14936.
---
 .../org/apache/flink/util/SerializedValue.java     | 42 +++++++++++++---------
 .../org/apache/flink/util/SerializedValueTest.java | 37 +++++++++++--------
 .../org/apache/flink/runtime/blob/BlobWriter.java  |  2 +-
 .../TaskExecutorOperatorEventHandlingTest.java     |  2 +-
 4 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
index fe0878d..ca3ccb0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -20,8 +20,6 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -43,35 +41,50 @@ public class SerializedValue<T> implements java.io.Serializable {
     private static final long serialVersionUID = -3564011643393683761L;
 
     /** The serialized data. */
-    @Nullable private final byte[] serializedData;
+    private final byte[] serializedData;
 
     private SerializedValue(byte[] serializedData) {
-        Preconditions.checkNotNull(serializedData, "Serialized data");
+        Preconditions.checkNotNull(serializedData, "Serialized data must not be null");
+        Preconditions.checkArgument(
+                serializedData.length != 0, "Serialized data must not be empty");
         this.serializedData = serializedData;
     }
 
+    /**
+     * Constructs a serialized value.
+     *
+     * @param value value to serialize
+     * @throws NullPointerException if value is null
+     * @throws IOException exception during serialization
+     */
     public SerializedValue(T value) throws IOException {
-        this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+        Preconditions.checkNotNull(value, "Value must not be null");
+        this.serializedData = InstantiationUtil.serializeObject(value);
     }
 
-    @SuppressWarnings("unchecked")
     public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
         Preconditions.checkNotNull(loader, "No classloader has been passed");
-        return serializedData == null
-                ? null
-                : (T) InstantiationUtil.deserializeObject(serializedData, loader);
+        return InstantiationUtil.deserializeObject(serializedData, loader);
     }
 
     /**
-     * Returns the serialized value or <code>null</code> if no value is set.
+     * Returns byte array for serialized data.
      *
      * @return Serialized data.
      */
-    @Nullable
     public byte[] getByteArray() {
         return serializedData;
     }
 
+    /**
+     * Constructs serialized value from serialized data.
+     *
+     * @param serializedData serialized data
+     * @param <T> type
+     * @return serialized value
+     * @throws NullPointerException if serialized data is null
+     * @throws IllegalArgumentException if serialized data is empty
+     */
     public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
         return new SerializedValue<>(serializedData);
     }
@@ -80,17 +93,14 @@ public class SerializedValue<T> implements java.io.Serializable {
 
     @Override
     public int hashCode() {
-        return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+        return Arrays.hashCode(serializedData);
     }
 
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof SerializedValue) {
             SerializedValue<?> other = (SerializedValue<?>) obj;
-            return this.serializedData == null
-                    ? other.serializedData == null
-                    : (other.serializedData != null
-                            && Arrays.equals(this.serializedData, other.serializedData));
+            return Arrays.equals(this.serializedData, other.serializedData);
         } else {
             return false;
         }
diff --git a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
index c2ce771..011d44f 100644
--- a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
 
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link SerializedValue}. */
@@ -47,26 +50,32 @@ public class SerializedValueTest {
             assertNotNull(v.toString());
             assertNotNull(copy.toString());
 
+            assertNotEquals(0, v.getByteArray().length);
+            assertArrayEquals(v.getByteArray(), copy.getByteArray());
+
+            byte[] bytes = v.getByteArray();
+            SerializedValue<String> saved =
+                    SerializedValue.fromBytes(Arrays.copyOf(bytes, bytes.length));
+            assertEquals(v, saved);
+            assertArrayEquals(v.getByteArray(), saved.getByteArray());
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
-    @Test
-    public void testNullValue() {
-        try {
-            SerializedValue<Object> v = new SerializedValue<>(null);
-            SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
+    @Test(expected = NullPointerException.class)
+    public void testNullValue() throws Exception {
+        new SerializedValue<>(null);
+    }
 
-            assertNull(copy.deserializeValue(getClass().getClassLoader()));
+    @Test(expected = NullPointerException.class)
+    public void testFromNullBytes() {
+        SerializedValue.fromBytes(null);
+    }
 
-            assertEquals(v, copy);
-            assertEquals(v.hashCode(), copy.hashCode());
-            assertEquals(v.toString(), copy.toString());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromEmptyBytes() {
+        SerializedValue.fromBytes(new byte[0]);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 242b573..7159348 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -85,7 +85,7 @@ public interface BlobWriter {
         final SerializedValue<T> serializedValue = new SerializedValue<>(value);
 
         if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
-            return Either.Left(new SerializedValue<>(value));
+            return Either.Left(serializedValue);
         } else {
             try {
                 final PermanentBlobKey permanentBlobKey =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
index f9d5c29..f84d513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -93,7 +93,7 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
             final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
             final CompletableFuture<?> resultFuture =
                     tmGateway.sendOperatorEventToTask(
-                            eid, new OperatorID(), new SerializedValue<>(null));
+                            eid, new OperatorID(), new SerializedValue<>(new TestOperatorEvent()));
 
             assertThat(
                     resultFuture,