You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/18 18:00:35 UTC

[flink] branch master updated (4c402a3 -> ed981be)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4c402a3  [FLINK-19503][state] Add StateChangelog API
     new bbdd769  [FLINK-20580][rpc] Separate wire value class from user values
     new ed981be  [FLINK-20580][core] Does not accept null value for SerializedValue

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/util/SerializedValue.java     |  42 ++++---
 .../org/apache/flink/util/SerializedValueTest.java |  37 +++---
 .../org/apache/flink/runtime/blob/BlobWriter.java  |   2 +-
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |   5 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  18 ++-
 .../runtime/rpc/akka/AkkaRpcSerializedValue.java   |  88 +++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  60 ++++++++++
 .../rpc/akka/AkkaRpcSerializedValueTest.java       | 125 +++++++++++++++++++++
 .../runtime/rpc/akka/RemoteAkkaRpcActorTest.java   |  28 +++++
 .../TaskExecutorOperatorEventHandlingTest.java     |   2 +-
 10 files changed, 362 insertions(+), 45 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java


[flink] 01/02: [FLINK-20580][rpc] Separate wire value class from user values

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbdd769253b25b4093a1759c835c6ff1d99d390d
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Feb 13 00:06:53 2021 +0800

    [FLINK-20580][rpc] Separate wire value class from user values
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |   5 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  18 ++-
 .../runtime/rpc/akka/AkkaRpcSerializedValue.java   |  88 +++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  60 ++++++++++
 .../rpc/akka/AkkaRpcSerializedValueTest.java       | 125 +++++++++++++++++++++
 .../runtime/rpc/akka/RemoteAkkaRpcActorTest.java   |  28 +++++
 6 files changed, 311 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index a6d6a4b..5ec3f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
@@ -389,9 +388,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
     }
 
     static Object deserializeValueIfNeeded(Object o, Method method) {
-        if (o instanceof SerializedValue) {
+        if (o instanceof AkkaRpcSerializedValue) {
             try {
-                return ((SerializedValue<?>) o)
+                return ((AkkaRpcSerializedValue) o)
                         .deserializeValue(AkkaInvocationHandler.class.getClassLoader());
             } catch (IOException | ClassNotFoundException e) {
                 throw new CompletionException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 08fe9b5..b0a2a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 
 import akka.actor.AbstractActor;
 import akka.actor.ActorRef;
@@ -332,7 +331,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
     private void sendSyncResponse(Object response, String methodName) {
         if (isRemoteSender(getSender())) {
-            Either<SerializedValue<?>, AkkaRpcException> serializedResult =
+            Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult =
                     serializeRemoteResultAndVerifySize(response, methodName);
 
             if (serializedResult.isLeft()) {
@@ -356,8 +355,10 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
                                 promise.failure(throwable);
                             } else {
                                 if (isRemoteSender(sender)) {
-                                    Either<SerializedValue<?>, AkkaRpcException> serializedResult =
-                                            serializeRemoteResultAndVerifySize(value, methodName);
+                                    Either<AkkaRpcSerializedValue, AkkaRpcException>
+                                            serializedResult =
+                                                    serializeRemoteResultAndVerifySize(
+                                                            value, methodName);
 
                                     if (serializedResult.isLeft()) {
                                         promise.success(serializedResult.left());
@@ -380,15 +381,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
         return !sender.path().address().hasLocalScope();
     }
 
-    private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(
+    private Either<AkkaRpcSerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
             Object result, String methodName) {
         try {
-            SerializedValue<?> serializedResult = new SerializedValue<>(result);
+            AkkaRpcSerializedValue serializedResult = AkkaRpcSerializedValue.valueOf(result);
 
-            long resultSize =
-                    serializedResult.getByteArray() == null
-                            ? 0
-                            : serializedResult.getByteArray().length;
+            long resultSize = serializedResult.getSerializedDataLength();
             if (resultSize > maximumFramesize) {
                 return Either.Right(
                         new AkkaRpcException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java
new file mode 100644
index 0000000..fb8011c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/** A self-contained serialized value to decouple from user values and transfer on wire. */
+final class AkkaRpcSerializedValue implements Serializable {
+    private static final long serialVersionUID = -4388571068440835689L;
+
+    @Nullable private final byte[] serializedData;
+
+    private AkkaRpcSerializedValue(@Nullable byte[] serializedData) {
+        this.serializedData = serializedData;
+    }
+
+    @Nullable
+    public byte[] getSerializedData() {
+        return serializedData;
+    }
+
+    /** Return length of serialized data, zero if no serialized data. */
+    public int getSerializedDataLength() {
+        return serializedData == null ? 0 : serializedData.length;
+    }
+
+    @Nullable
+    public <T> T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
+        Preconditions.checkNotNull(loader, "No classloader has been passed");
+        return serializedData == null
+                ? null
+                : InstantiationUtil.deserializeObject(serializedData, loader);
+    }
+
+    /**
+     * Construct a serialized value to transfer on wire.
+     *
+     * @param value nullable value
+     * @return serialized value to transfer on wire
+     * @throws IOException exception during value serialization
+     */
+    public static AkkaRpcSerializedValue valueOf(@Nullable Object value) throws IOException {
+        byte[] serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+        return new AkkaRpcSerializedValue(serializedData);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof AkkaRpcSerializedValue) {
+            AkkaRpcSerializedValue other = (AkkaRpcSerializedValue) o;
+            return Arrays.equals(serializedData, other.serializedData);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(serializedData);
+    }
+
+    @Override
+    public String toString() {
+        return serializedData == null ? "AkkaRpcSerializedValue(null)" : "AkkaRpcSerializedValue";
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 689e14f..c833027 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -47,6 +48,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -535,6 +538,28 @@ public class AkkaRpcActorTest extends TestLogger {
         }
     }
 
+    @Test
+    public void canRespondWithSerializedValueLocally() throws Exception {
+        try (final SerializedValueRespondingEndpoint endpoint =
+                new SerializedValueRespondingEndpoint(akkaRpcService)) {
+            endpoint.start();
+
+            final SerializedValueRespondingGateway selfGateway =
+                    endpoint.getSelfGateway(SerializedValueRespondingGateway.class);
+
+            assertThat(
+                    selfGateway.getSerializedValueSynchronously(),
+                    equalTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
+
+            final CompletableFuture<SerializedValue<String>> responseFuture =
+                    selfGateway.getSerializedValue();
+
+            assertThat(
+                    responseFuture.get(),
+                    equalTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Test Actors and Interfaces
     // ------------------------------------------------------------------------
@@ -586,6 +611,41 @@ public class AkkaRpcActorTest extends TestLogger {
 
     // ------------------------------------------------------------------------
 
+    interface SerializedValueRespondingGateway extends RpcGateway {
+        CompletableFuture<SerializedValue<String>> getSerializedValue();
+
+        SerializedValue<String> getSerializedValueSynchronously();
+    }
+
+    static class SerializedValueRespondingEndpoint extends RpcEndpoint
+            implements SerializedValueRespondingGateway {
+        static final SerializedValue<String> SERIALIZED_VALUE;
+
+        static {
+            try {
+                SERIALIZED_VALUE = new SerializedValue<>("string-value");
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+
+        public SerializedValueRespondingEndpoint(RpcService rpcService) {
+            super(rpcService);
+        }
+
+        @Override
+        public CompletableFuture<SerializedValue<String>> getSerializedValue() {
+            return CompletableFuture.completedFuture(SERIALIZED_VALUE);
+        }
+
+        @Override
+        public SerializedValue<String> getSerializedValueSynchronously() {
+            return SERIALIZED_VALUE;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
     private interface ExceptionalGateway extends RpcGateway {
         CompletableFuture<Integer> doStuff();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java
new file mode 100644
index 0000000..70263a6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link AkkaRpcSerializedValue}. */
+public class AkkaRpcSerializedValueTest extends TestLogger {
+
+    @Test
+    public void testNullValue() throws Exception {
+        AkkaRpcSerializedValue serializedValue = AkkaRpcSerializedValue.valueOf(null);
+        assertThat(serializedValue.getSerializedData(), nullValue());
+        assertThat(serializedValue.getSerializedDataLength(), equalTo(0));
+        assertThat(serializedValue.deserializeValue(getClass().getClassLoader()), nullValue());
+
+        AkkaRpcSerializedValue otherSerializedValue = AkkaRpcSerializedValue.valueOf(null);
+        assertThat(otherSerializedValue, equalTo(serializedValue));
+        assertThat(otherSerializedValue.hashCode(), equalTo(serializedValue.hashCode()));
+
+        AkkaRpcSerializedValue clonedSerializedValue = InstantiationUtil.clone(serializedValue);
+        assertThat(clonedSerializedValue.getSerializedData(), nullValue());
+        assertThat(clonedSerializedValue.getSerializedDataLength(), equalTo(0));
+        assertThat(
+                clonedSerializedValue.deserializeValue(getClass().getClassLoader()), nullValue());
+        assertThat(clonedSerializedValue, equalTo(serializedValue));
+        assertThat(clonedSerializedValue.hashCode(), equalTo(serializedValue.hashCode()));
+    }
+
+    @Test
+    public void testNotNullValues() throws Exception {
+        Set<Object> values =
+                Stream.of(
+                                true,
+                                (byte) 5,
+                                (short) 6,
+                                5,
+                                5L,
+                                5.5F,
+                                6.5,
+                                'c',
+                                "string",
+                                Instant.now(),
+                                BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.TEN),
+                                BigDecimal.valueOf(Math.PI))
+                        .collect(Collectors.toSet());
+
+        Object previousValue = null;
+        AkkaRpcSerializedValue previousSerializedValue = null;
+        for (Object value : values) {
+            AkkaRpcSerializedValue serializedValue = AkkaRpcSerializedValue.valueOf(value);
+            assertThat(value.toString(), serializedValue.getSerializedData(), notNullValue());
+            assertThat(value.toString(), serializedValue.getSerializedDataLength(), greaterThan(0));
+            assertThat(
+                    value.toString(),
+                    serializedValue.deserializeValue(getClass().getClassLoader()),
+                    equalTo(value));
+
+            AkkaRpcSerializedValue otherSerializedValue = AkkaRpcSerializedValue.valueOf(value);
+            assertThat(value.toString(), otherSerializedValue, equalTo(serializedValue));
+            assertThat(
+                    value.toString(),
+                    otherSerializedValue.hashCode(),
+                    equalTo(serializedValue.hashCode()));
+
+            AkkaRpcSerializedValue clonedSerializedValue = InstantiationUtil.clone(serializedValue);
+            assertThat(
+                    value.toString(),
+                    clonedSerializedValue.getSerializedData(),
+                    equalTo(serializedValue.getSerializedData()));
+            assertThat(
+                    value.toString(),
+                    clonedSerializedValue.deserializeValue(getClass().getClassLoader()),
+                    equalTo(value));
+            assertThat(value.toString(), clonedSerializedValue, equalTo(serializedValue));
+            assertThat(
+                    value.toString(),
+                    clonedSerializedValue.hashCode(),
+                    equalTo(serializedValue.hashCode()));
+
+            if (previousValue != null && !previousValue.equals(value)) {
+                assertThat(
+                        value.toString() + " " + previousValue.toString(),
+                        serializedValue,
+                        not(equalTo(previousSerializedValue)));
+            }
+
+            previousValue = value;
+            previousSerializedValue = serializedValue;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java
index a9940d8..3cde7f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc.akka;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -97,4 +99,30 @@ public class RemoteAkkaRpcActorTest extends TestLogger {
             assertThat(value, is(nullValue()));
         }
     }
+
+    @Test
+    public void canRespondWithSerializedValueRemotely() throws Exception {
+        try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint =
+                new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService)) {
+            endpoint.start();
+
+            final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway =
+                    otherRpcService
+                            .connect(
+                                    endpoint.getAddress(),
+                                    AkkaRpcActorTest.SerializedValueRespondingGateway.class)
+                            .join();
+
+            assertThat(
+                    remoteGateway.getSerializedValueSynchronously(),
+                    equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
+
+            final CompletableFuture<SerializedValue<String>> responseFuture =
+                    remoteGateway.getSerializedValue();
+
+            assertThat(
+                    responseFuture.get(),
+                    equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
+        }
+    }
 }


[flink] 02/02: [FLINK-20580][core] Does not accept null value for SerializedValue

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed981be6601d3600d23b301d70d2bffff8aad3e6
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Feb 13 00:13:54 2021 +0800

    [FLINK-20580][core] Does not accept null value for SerializedValue
    
    This closes #14936.
---
 .../org/apache/flink/util/SerializedValue.java     | 42 +++++++++++++---------
 .../org/apache/flink/util/SerializedValueTest.java | 37 +++++++++++--------
 .../org/apache/flink/runtime/blob/BlobWriter.java  |  2 +-
 .../TaskExecutorOperatorEventHandlingTest.java     |  2 +-
 4 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
index fe0878d..ca3ccb0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -20,8 +20,6 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -43,35 +41,50 @@ public class SerializedValue<T> implements java.io.Serializable {
     private static final long serialVersionUID = -3564011643393683761L;
 
     /** The serialized data. */
-    @Nullable private final byte[] serializedData;
+    private final byte[] serializedData;
 
     private SerializedValue(byte[] serializedData) {
-        Preconditions.checkNotNull(serializedData, "Serialized data");
+        Preconditions.checkNotNull(serializedData, "Serialized data must not be null");
+        Preconditions.checkArgument(
+                serializedData.length != 0, "Serialized data must not be empty");
         this.serializedData = serializedData;
     }
 
+    /**
+     * Constructs a serialized value.
+     *
+     * @param value value to serialize
+     * @throws NullPointerException if value is null
+     * @throws IOException exception during serialization
+     */
     public SerializedValue(T value) throws IOException {
-        this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+        Preconditions.checkNotNull(value, "Value must not be null");
+        this.serializedData = InstantiationUtil.serializeObject(value);
     }
 
-    @SuppressWarnings("unchecked")
     public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException {
         Preconditions.checkNotNull(loader, "No classloader has been passed");
-        return serializedData == null
-                ? null
-                : (T) InstantiationUtil.deserializeObject(serializedData, loader);
+        return InstantiationUtil.deserializeObject(serializedData, loader);
     }
 
     /**
-     * Returns the serialized value or <code>null</code> if no value is set.
+     * Returns byte array for serialized data.
      *
      * @return Serialized data.
      */
-    @Nullable
     public byte[] getByteArray() {
         return serializedData;
     }
 
+    /**
+     * Constructs serialized value from serialized data.
+     *
+     * @param serializedData serialized data
+     * @param <T> type
+     * @return serialized value
+     * @throws NullPointerException if serialized data is null
+     * @throws IllegalArgumentException if serialized data is empty
+     */
     public static <T> SerializedValue<T> fromBytes(byte[] serializedData) {
         return new SerializedValue<>(serializedData);
     }
@@ -80,17 +93,14 @@ public class SerializedValue<T> implements java.io.Serializable {
 
     @Override
     public int hashCode() {
-        return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+        return Arrays.hashCode(serializedData);
     }
 
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof SerializedValue) {
             SerializedValue<?> other = (SerializedValue<?>) obj;
-            return this.serializedData == null
-                    ? other.serializedData == null
-                    : (other.serializedData != null
-                            && Arrays.equals(this.serializedData, other.serializedData));
+            return Arrays.equals(this.serializedData, other.serializedData);
         } else {
             return false;
         }
diff --git a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
index c2ce771..011d44f 100644
--- a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
 
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link SerializedValue}. */
@@ -47,26 +50,32 @@ public class SerializedValueTest {
             assertNotNull(v.toString());
             assertNotNull(copy.toString());
 
+            assertNotEquals(0, v.getByteArray().length);
+            assertArrayEquals(v.getByteArray(), copy.getByteArray());
+
+            byte[] bytes = v.getByteArray();
+            SerializedValue<String> saved =
+                    SerializedValue.fromBytes(Arrays.copyOf(bytes, bytes.length));
+            assertEquals(v, saved);
+            assertArrayEquals(v.getByteArray(), saved.getByteArray());
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
-    @Test
-    public void testNullValue() {
-        try {
-            SerializedValue<Object> v = new SerializedValue<>(null);
-            SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
+    @Test(expected = NullPointerException.class)
+    public void testNullValue() throws Exception {
+        new SerializedValue<>(null);
+    }
 
-            assertNull(copy.deserializeValue(getClass().getClassLoader()));
+    @Test(expected = NullPointerException.class)
+    public void testFromNullBytes() {
+        SerializedValue.fromBytes(null);
+    }
 
-            assertEquals(v, copy);
-            assertEquals(v.hashCode(), copy.hashCode());
-            assertEquals(v.toString(), copy.toString());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromEmptyBytes() {
+        SerializedValue.fromBytes(new byte[0]);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 242b573..7159348 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -85,7 +85,7 @@ public interface BlobWriter {
         final SerializedValue<T> serializedValue = new SerializedValue<>(value);
 
         if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
-            return Either.Left(new SerializedValue<>(value));
+            return Either.Left(serializedValue);
         } else {
             try {
                 final PermanentBlobKey permanentBlobKey =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
index f9d5c29..f84d513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -93,7 +93,7 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
             final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
             final CompletableFuture<?> resultFuture =
                     tmGateway.sendOperatorEventToTask(
-                            eid, new OperatorID(), new SerializedValue<>(null));
+                            eid, new OperatorID(), new SerializedValue<>(new TestOperatorEvent()));
 
             assertThat(
                     resultFuture,