You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2024/03/06 09:09:35 UTC

(flink) branch master updated: Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a709bf323c Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"
7a709bf323c is described below

commit 7a709bf323c1cce3440887fe937311bae119aab0
Author: caodizhou <ca...@bytedance.com>
AuthorDate: Wed Mar 6 14:11:56 2024 +0800

    Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"
    
    This reverts commit d18a4bfe596fc580f8280750fa3bfa22007671d9.
---
 .../org/apache/flink/runtime/blob/BlobWriter.java  | 11 ++--
 .../deployment/CachedShuffleDescriptors.java       |  2 +-
 .../deployment/InputGateDeploymentDescriptor.java  | 41 ++++++++++-----
 .../deployment/TaskDeploymentDescriptor.java       | 19 -------
 .../TaskDeploymentDescriptorFactory.java           | 58 ++++++++--------------
 .../deployment/CachedShuffleDescriptorsTest.java   | 30 ++++++-----
 .../TaskDeploymentDescriptorTestUtils.java         |  9 ++--
 .../partition/consumer/SingleInputGateTest.java    |  6 ++-
 8 files changed, 83 insertions(+), 93 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 555cccfb7ca..2d5292b42cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Optional;
 
 /** BlobWriter is used to upload data to the BLOB store. */
 public interface BlobWriter {
@@ -103,13 +102,11 @@ public interface BlobWriter {
         if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
             return Either.Left(serializedValue);
         } else {
-            return offloadWithException(serializedValue, jobId, blobWriter)
-                    .map(Either::<SerializedValue<T>, PermanentBlobKey>Right)
-                    .orElse(Either.Left(serializedValue));
+            return offloadWithException(serializedValue, jobId, blobWriter);
         }
     }
 
-    static <T> Optional<PermanentBlobKey> offloadWithException(
+    static <T> Either<SerializedValue<T>, PermanentBlobKey> offloadWithException(
             SerializedValue<T> serializedValue, JobID jobId, BlobWriter blobWriter) {
         Preconditions.checkNotNull(serializedValue);
         Preconditions.checkNotNull(jobId);
@@ -117,10 +114,10 @@ public interface BlobWriter {
         try {
             final PermanentBlobKey permanentBlobKey =
                     blobWriter.putPermanent(jobId, serializedValue.getByteArray());
-            return Optional.of(permanentBlobKey);
+            return Either.Right(permanentBlobKey);
         } catch (IOException e) {
             LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, e);
-            return Optional.empty();
+            return Either.Left(serializedValue);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
index 4ddacbd671a..b8e0b44006f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
@@ -87,7 +87,7 @@ public class CachedShuffleDescriptors {
                     new ShuffleDescriptorGroup(
                             toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0]));
             MaybeOffloaded<ShuffleDescriptorGroup> serializedShuffleDescriptorGroup =
-                    shuffleDescriptorSerializer.trySerializeAndOffloadShuffleDescriptor(
+                    shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor(
                             shuffleDescriptorGroup, numConsumers);
 
             toBeSerialized.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 4e02c699331..333a91e0a73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -98,7 +98,9 @@ public class InputGateDeploymentDescriptor implements Serializable {
                 new IndexRange(consumedSubpartitionIndex, consumedSubpartitionIndex),
                 inputChannels.length,
                 Collections.singletonList(
-                        new NonOffloadedRaw<>(new ShuffleDescriptorGroup(inputChannels))));
+                        new NonOffloaded<>(
+                                CompressedSerializedValue.fromObject(
+                                        new ShuffleDescriptorGroup(inputChannels)))));
     }
 
     public InputGateDeploymentDescriptor(
@@ -145,14 +147,18 @@ public class InputGateDeploymentDescriptor implements Serializable {
             // This is only for testing scenarios, in a production environment we always call
             // tryLoadAndDeserializeShuffleDescriptors to deserialize ShuffleDescriptors first.
             inputChannels = new ShuffleDescriptor[numberOfInputChannels];
-            for (MaybeOffloaded<ShuffleDescriptorGroup> rawShuffleDescriptors :
-                    serializedInputChannels) {
-                checkState(
-                        rawShuffleDescriptors instanceof NonOffloadedRaw,
-                        "Trying to work with offloaded serialized shuffle descriptors.");
-                NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRawValue =
-                        (NonOffloadedRaw<ShuffleDescriptorGroup>) rawShuffleDescriptors;
-                putOrReplaceShuffleDescriptors(nonOffloadedRawValue.value);
+            try {
+                for (MaybeOffloaded<ShuffleDescriptorGroup> serializedShuffleDescriptors :
+                        serializedInputChannels) {
+                    checkState(
+                            serializedShuffleDescriptors instanceof NonOffloaded,
+                            "Trying to work with offloaded serialized shuffle descriptors.");
+                    NonOffloaded<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
+                            (NonOffloaded<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
+                    tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
+                }
+            } catch (ClassNotFoundException | IOException e) {
+                throw new RuntimeException("Could not deserialize shuffle descriptors.", e);
             }
         }
         return inputChannels;
@@ -207,12 +213,21 @@ public class InputGateDeploymentDescriptor implements Serializable {
             }
             putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
         } else {
-            NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
-                    (NonOffloadedRaw<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
-            putOrReplaceShuffleDescriptors(nonOffloadedSerializedValue.value);
+            NonOffloaded<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
+                    (NonOffloaded<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
+            tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
         }
     }
 
+    private void tryDeserializeShuffleDescriptorGroup(
+            NonOffloaded<ShuffleDescriptorGroup> nonOffloadedShuffleDescriptorGroup)
+            throws IOException, ClassNotFoundException {
+        ShuffleDescriptorGroup shuffleDescriptorGroup =
+                nonOffloadedShuffleDescriptorGroup.serializedValue.deserializeValue(
+                        getClass().getClassLoader());
+        putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
+    }
+
     private void putOrReplaceShuffleDescriptors(ShuffleDescriptorGroup shuffleDescriptorGroup) {
         for (ShuffleDescriptorAndIndex shuffleDescriptorAndIndex :
                 shuffleDescriptorGroup.getShuffleDescriptors()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 016105d9aac..5684066735f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -80,25 +80,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
         }
     }
 
-    /**
-     * The raw value that is not offloaded to the {@link org.apache.flink.runtime.blob.BlobServer}.
-     *
-     * @param <T> type of the raw value
-     */
-    public static class NonOffloadedRaw<T> extends MaybeOffloaded<T> {
-        private static final long serialVersionUID = 1L;
-
-        /** The raw value. */
-        public T value;
-
-        @SuppressWarnings("unused")
-        public NonOffloadedRaw() {}
-
-        public NonOffloadedRaw(T value) {
-            this.value = Preconditions.checkNotNull(value);
-        }
-    }
-
     /**
      * Reference to a serialized value that was offloaded to the {@link
      * org.apache.flink.runtime.blob.BlobServer}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index d6a2b16010b..8b0498159a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -47,14 +47,11 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.CompressedSerializedValue;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -452,7 +449,7 @@ public class TaskDeploymentDescriptorFactory {
     public static class ShuffleDescriptorGroup implements Serializable {
         private static final long serialVersionUID = 1L;
 
-        private ShuffleDescriptorAndIndex[] shuffleDescriptors;
+        private final ShuffleDescriptorAndIndex[] shuffleDescriptors;
 
         public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[] shuffleDescriptors) {
             this.shuffleDescriptors = checkNotNull(shuffleDescriptors);
@@ -461,31 +458,19 @@ public class TaskDeploymentDescriptorFactory {
         public ShuffleDescriptorAndIndex[] getShuffleDescriptors() {
             return shuffleDescriptors;
         }
-
-        private void writeObject(ObjectOutputStream oos) throws IOException {
-            byte[] bytes = InstantiationUtil.serializeObjectAndCompress(shuffleDescriptors);
-            oos.writeObject(bytes);
-        }
-
-        private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-            byte[] bytes = (byte[]) ois.readObject();
-            shuffleDescriptors =
-                    InstantiationUtil.decompressAndDeserializeObject(
-                            bytes, ClassLoader.getSystemClassLoader());
-        }
     }
 
-    /** Offload shuffle descriptors. */
+    /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
         /**
-         * Try to serialize and offload shuffle descriptors.
+         * Serialize and try offload shuffle descriptors.
          *
-         * @param shuffleDescriptorGroup to serialize and offload
+         * @param shuffleDescriptorGroup to serialize
          * @param numConsumer consumers number of these shuffle descriptors, it means how many times
          *     serialized shuffle descriptor should be sent
-         * @return offloaded serialized or non-offloaded raw shuffle descriptors
+         * @return offloaded or non-offloaded serialized shuffle descriptors
          */
-        MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
+        MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
                 ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException;
     }
 
@@ -502,24 +487,25 @@ public class TaskDeploymentDescriptorFactory {
         }
 
         @Override
-        public MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
+        public MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
                 ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException {
 
-            final Either<ShuffleDescriptorGroup, PermanentBlobKey> rawValueOrBlobKey =
-                    shouldOffload(shuffleDescriptorGroup.getShuffleDescriptors(), numConsumer)
-                            ? BlobWriter.offloadWithException(
-                                            CompressedSerializedValue.fromObject(
-                                                    shuffleDescriptorGroup),
-                                            jobID,
-                                            blobWriter)
-                                    .map(Either::<ShuffleDescriptorGroup, PermanentBlobKey>Right)
-                                    .orElse(Either.Left(shuffleDescriptorGroup))
-                            : Either.Left(shuffleDescriptorGroup);
-
-            if (rawValueOrBlobKey.isLeft()) {
-                return new TaskDeploymentDescriptor.NonOffloadedRaw<>(rawValueOrBlobKey.left());
+            final CompressedSerializedValue<ShuffleDescriptorGroup> compressedSerializedValue =
+                    CompressedSerializedValue.fromObject(shuffleDescriptorGroup);
+
+            final Either<SerializedValue<ShuffleDescriptorGroup>, PermanentBlobKey>
+                    serializedValueOrBlobKey =
+                            shouldOffload(
+                                            shuffleDescriptorGroup.getShuffleDescriptors(),
+                                            numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, blobWriter)
+                                    : Either.Left(compressedSerializedValue);
+
+            if (serializedValueOrBlobKey.isLeft()) {
+                return new TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
             } else {
-                return new TaskDeploymentDescriptor.Offloaded<>(rawValueOrBlobKey.right());
+                return new TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
             }
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
index 0160d180e23..f9cd00e103b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.deployment;
 
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -39,10 +39,12 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.CompressedSerializedValue;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -88,7 +90,7 @@ class CachedShuffleDescriptorsTest {
         assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).hasSize(1);
         MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloadedShuffleDescriptor =
                 cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups().get(0);
-        assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+        assertNonOffloadedShuffleDescriptorAndIndexEquals(
                 maybeOffloadedShuffleDescriptor,
                 Collections.singletonList(shuffleDescriptor),
                 Collections.singletonList(0));
@@ -142,22 +144,26 @@ class CachedShuffleDescriptorsTest {
                         intermediateResultPartition2,
                         TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
                         false);
-        assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+        assertNonOffloadedShuffleDescriptorAndIndexEquals(
                 maybeOffloaded,
                 Arrays.asList(expectedShuffleDescriptor1, expectedShuffleDescriptor2),
                 Arrays.asList(0, 1));
     }
 
-    private void assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+    private void assertNonOffloadedShuffleDescriptorAndIndexEquals(
             MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloaded,
             List<ShuffleDescriptor> expectedDescriptors,
-            List<Integer> expectedIndices) {
+            List<Integer> expectedIndices)
+            throws Exception {
         assertThat(expectedDescriptors).hasSameSizeAs(expectedIndices);
-        assertThat(maybeOffloaded).isInstanceOf(NonOffloadedRaw.class);
-        NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRaw =
-                (NonOffloadedRaw<ShuffleDescriptorGroup>) maybeOffloaded;
+        assertThat(maybeOffloaded).isInstanceOf(NonOffloaded.class);
+        NonOffloaded<ShuffleDescriptorGroup> nonOffloaded =
+                (NonOffloaded<ShuffleDescriptorGroup>) maybeOffloaded;
         ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndices =
-                nonOffloadedRaw.value.getShuffleDescriptors();
+                nonOffloaded
+                        .serializedValue
+                        .deserializeValue(getClass().getClassLoader())
+                        .getShuffleDescriptors();
         assertThat(shuffleDescriptorAndIndices).hasSameSizeAs(expectedDescriptors);
         for (int i = 0; i < shuffleDescriptorAndIndices.length; i++) {
             assertThat(shuffleDescriptorAndIndices[i].getIndex()).isEqualTo(expectedIndices.get(i));
@@ -212,9 +218,9 @@ class CachedShuffleDescriptorsTest {
             implements TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer {
 
         @Override
-        public MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
-                ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) {
-            return new NonOffloadedRaw<>(shuffleDescriptorGroup);
+        public MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException {
+            return new NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptorGroup));
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
index 04683d4489c..19fbefe2920 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.TestingBlobWriter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -47,8 +47,11 @@ public class TaskDeploymentDescriptorTestUtils {
         int maxIndex = 0;
         for (MaybeOffloaded<ShuffleDescriptorGroup> sd : maybeOffloaded) {
             ShuffleDescriptorGroup shuffleDescriptorGroup;
-            if (sd instanceof NonOffloadedRaw) {
-                shuffleDescriptorGroup = ((NonOffloadedRaw<ShuffleDescriptorGroup>) sd).value;
+            if (sd instanceof NonOffloaded) {
+                shuffleDescriptorGroup =
+                        ((NonOffloaded<ShuffleDescriptorGroup>) sd)
+                                .serializedValue.deserializeValue(
+                                        ClassLoader.getSystemClassLoader());
 
             } else {
                 final CompressedSerializedValue<ShuffleDescriptorGroup> compressedSerializedValue =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 071a23ad34b..0ae74209507 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
+import org.apache.flink.util.CompressedSerializedValue;
 
 import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
 
@@ -1312,8 +1313,9 @@ class SingleInputGateTest extends InputGateTestBase {
                         subpartitionIndexRange,
                         channelDescs.length,
                         Collections.singletonList(
-                                new TaskDeploymentDescriptor.NonOffloadedRaw<>(
-                                        new ShuffleDescriptorGroup(channelDescs))));
+                                new TaskDeploymentDescriptor.NonOffloaded<>(
+                                        CompressedSerializedValue.fromObject(
+                                                new ShuffleDescriptorGroup(channelDescs)))));
 
         final TaskMetricGroup taskMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();