You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/18 18:00:37 UTC
[flink] 02/02: [FLINK-20580][core] Does not accept null value for
SerializedValue
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed981be6601d3600d23b301d70d2bffff8aad3e6
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Feb 13 00:13:54 2021 +0800
[FLINK-20580][core] Does not accept null value for SerializedValue
This closes #14936.
---
.../org/apache/flink/util/SerializedValue.java | 42 +++++++++++++---------
.../org/apache/flink/util/SerializedValueTest.java | 37 +++++++++++--------
.../org/apache/flink/runtime/blob/BlobWriter.java | 2 +-
.../TaskExecutorOperatorEventHandlingTest.java | 2 +-
4 files changed, 51 insertions(+), 32 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
index fe0878d..ca3ccb0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -20,8 +20,6 @@ package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Arrays;
@@ -43,35 +41,50 @@ public class SerializedValue<T> implements java.io.Serializable {
private static final long serialVersionUID = -3564011643393683761L;
/** The serialized data. */
- @Nullable private final byte[] serializedData;
+ private final byte[] serializedData;
private SerializedValue(byte[] serializedData) {
- Preconditions.checkNotNull(serializedData, "Serialized data");
+ Preconditions.checkNotNull(serializedData, "Serialized data must not be null");
+ Preconditions.checkArgument(
+ serializedData.length != 0, "Serialized data must not be empty");
this.serializedData = serializedData;
}
+ /**
+ * Constructs a serialized value.
+ *
+ * @param value value to serialize
+ * @throws NullPointerException if value is null
+ * @throws IOException exception during serialization
+ */
public SerializedValue(T value) throws IOException {
- this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+ Preconditions.checkNotNull(value, "Value must not be null");
+ this.serializedData = InstantiationUtil.serializeObject(value);
}
- @SuppressWarnings("unchecked")
public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
Preconditions.checkNotNull(loader, "No classloader has been passed");
- return serializedData == null
- ? null
- : (T) InstantiationUtil.deserializeObject(serializedData, loader);
+ return InstantiationUtil.deserializeObject(serializedData, loader);
}
/**
- * Returns the serialized value or <code>null</code> if no value is set.
+ * Returns byte array for serialized data.
*
* @return Serialized data.
*/
- @Nullable
public byte[] getByteArray() {
return serializedData;
}
+ /**
+ * Constructs serialized value from serialized data.
+ *
+ * @param serializedData serialized data
+ * @param <T> type
+ * @return serialized value
+ * @throws NullPointerException if serialized data is null
+ * @throws IllegalArgumentException if serialized data is empty
+ */
public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
return new SerializedValue<>(serializedData);
}
@@ -80,17 +93,14 @@ public class SerializedValue<T> implements java.io.Serializable {
@Override
public int hashCode() {
- return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+ return Arrays.hashCode(serializedData);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SerializedValue) {
SerializedValue<?> other = (SerializedValue<?>) obj;
- return this.serializedData == null
- ? other.serializedData == null
- : (other.serializedData != null
- && Arrays.equals(this.serializedData, other.serializedData));
+ return Arrays.equals(this.serializedData, other.serializedData);
} else {
return false;
}
diff --git a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
index c2ce771..011d44f 100644
--- a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
/** Tests for the {@link SerializedValue}. */
@@ -47,26 +50,32 @@ public class SerializedValueTest {
assertNotNull(v.toString());
assertNotNull(copy.toString());
+ assertNotEquals(0, v.getByteArray().length);
+ assertArrayEquals(v.getByteArray(), copy.getByteArray());
+
+ byte[] bytes = v.getByteArray();
+ SerializedValue<String> saved =
+ SerializedValue.fromBytes(Arrays.copyOf(bytes, bytes.length));
+ assertEquals(v, saved);
+ assertArrayEquals(v.getByteArray(), saved.getByteArray());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
- @Test
- public void testNullValue() {
- try {
- SerializedValue<Object> v = new SerializedValue<>(null);
- SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
+ @Test(expected = NullPointerException.class)
+ public void testNullValue() throws Exception {
+ new SerializedValue<>(null);
+ }
- assertNull(copy.deserializeValue(getClass().getClassLoader()));
+ @Test(expected = NullPointerException.class)
+ public void testFromNullBytes() {
+ SerializedValue.fromBytes(null);
+ }
- assertEquals(v, copy);
- assertEquals(v.hashCode(), copy.hashCode());
- assertEquals(v.toString(), copy.toString());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromEmptyBytes() {
+ SerializedValue.fromBytes(new byte[0]);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 242b573..7159348 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -85,7 +85,7 @@ public interface BlobWriter {
final SerializedValue<T> serializedValue = new SerializedValue<>(value);
if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
- return Either.Left(new SerializedValue<>(value));
+ return Either.Left(serializedValue);
} else {
try {
final PermanentBlobKey permanentBlobKey =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
index f9d5c29..f84d513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -93,7 +93,7 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
final CompletableFuture<?> resultFuture =
tmGateway.sendOperatorEventToTask(
- eid, new OperatorID(), new SerializedValue<>(null));
+ eid, new OperatorID(), new SerializedValue<>(new TestOperatorEvent()));
assertThat(
resultFuture,