You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2024/03/06 09:09:35 UTC
(flink) branch master updated: Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"
This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7a709bf323c Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"
7a709bf323c is described below
commit 7a709bf323c1cce3440887fe937311bae119aab0
Author: caodizhou <ca...@bytedance.com>
AuthorDate: Wed Mar 6 14:11:56 2024 +0800
Revert "[FLINK-33532][network] Move the serialization of ShuffleDescriptorGroup out of the RPC main thread]"
This reverts commit d18a4bfe596fc580f8280750fa3bfa22007671d9.
---
.../org/apache/flink/runtime/blob/BlobWriter.java | 11 ++--
.../deployment/CachedShuffleDescriptors.java | 2 +-
.../deployment/InputGateDeploymentDescriptor.java | 41 ++++++++++-----
.../deployment/TaskDeploymentDescriptor.java | 19 -------
.../TaskDeploymentDescriptorFactory.java | 58 ++++++++--------------
.../deployment/CachedShuffleDescriptorsTest.java | 30 ++++++-----
.../TaskDeploymentDescriptorTestUtils.java | 9 ++--
.../partition/consumer/SingleInputGateTest.java | 6 ++-
8 files changed, 83 insertions(+), 93 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 555cccfb7ca..2d5292b42cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Optional;
/** BlobWriter is used to upload data to the BLOB store. */
public interface BlobWriter {
@@ -103,13 +102,11 @@ public interface BlobWriter {
if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
return Either.Left(serializedValue);
} else {
- return offloadWithException(serializedValue, jobId, blobWriter)
- .map(Either::<SerializedValue<T>, PermanentBlobKey>Right)
- .orElse(Either.Left(serializedValue));
+ return offloadWithException(serializedValue, jobId, blobWriter);
}
}
- static <T> Optional<PermanentBlobKey> offloadWithException(
+ static <T> Either<SerializedValue<T>, PermanentBlobKey> offloadWithException(
SerializedValue<T> serializedValue, JobID jobId, BlobWriter blobWriter) {
Preconditions.checkNotNull(serializedValue);
Preconditions.checkNotNull(jobId);
@@ -117,10 +114,10 @@ public interface BlobWriter {
try {
final PermanentBlobKey permanentBlobKey =
blobWriter.putPermanent(jobId, serializedValue.getByteArray());
- return Optional.of(permanentBlobKey);
+ return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, e);
- return Optional.empty();
+ return Either.Left(serializedValue);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
index 4ddacbd671a..b8e0b44006f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
@@ -87,7 +87,7 @@ public class CachedShuffleDescriptors {
new ShuffleDescriptorGroup(
toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0]));
MaybeOffloaded<ShuffleDescriptorGroup> serializedShuffleDescriptorGroup =
- shuffleDescriptorSerializer.trySerializeAndOffloadShuffleDescriptor(
+ shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor(
shuffleDescriptorGroup, numConsumers);
toBeSerialized.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 4e02c699331..333a91e0a73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -98,7 +98,9 @@ public class InputGateDeploymentDescriptor implements Serializable {
new IndexRange(consumedSubpartitionIndex, consumedSubpartitionIndex),
inputChannels.length,
Collections.singletonList(
- new NonOffloadedRaw<>(new ShuffleDescriptorGroup(inputChannels))));
+ new NonOffloaded<>(
+ CompressedSerializedValue.fromObject(
+ new ShuffleDescriptorGroup(inputChannels)))));
}
public InputGateDeploymentDescriptor(
@@ -145,14 +147,18 @@ public class InputGateDeploymentDescriptor implements Serializable {
// This is only for testing scenarios, in a production environment we always call
// tryLoadAndDeserializeShuffleDescriptors to deserialize ShuffleDescriptors first.
inputChannels = new ShuffleDescriptor[numberOfInputChannels];
- for (MaybeOffloaded<ShuffleDescriptorGroup> rawShuffleDescriptors :
- serializedInputChannels) {
- checkState(
- rawShuffleDescriptors instanceof NonOffloadedRaw,
- "Trying to work with offloaded serialized shuffle descriptors.");
- NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRawValue =
- (NonOffloadedRaw<ShuffleDescriptorGroup>) rawShuffleDescriptors;
- putOrReplaceShuffleDescriptors(nonOffloadedRawValue.value);
+ try {
+ for (MaybeOffloaded<ShuffleDescriptorGroup> serializedShuffleDescriptors :
+ serializedInputChannels) {
+ checkState(
+ serializedShuffleDescriptors instanceof NonOffloaded,
+ "Trying to work with offloaded serialized shuffle descriptors.");
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
+ (NonOffloaded<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
+ tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
+ }
+ } catch (ClassNotFoundException | IOException e) {
+ throw new RuntimeException("Could not deserialize shuffle descriptors.", e);
}
}
return inputChannels;
@@ -207,12 +213,21 @@ public class InputGateDeploymentDescriptor implements Serializable {
}
putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
} else {
- NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
- (NonOffloadedRaw<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
- putOrReplaceShuffleDescriptors(nonOffloadedSerializedValue.value);
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
+ (NonOffloaded<ShuffleDescriptorGroup>) serializedShuffleDescriptors;
+ tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
}
}
+ private void tryDeserializeShuffleDescriptorGroup(
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloadedShuffleDescriptorGroup)
+ throws IOException, ClassNotFoundException {
+ ShuffleDescriptorGroup shuffleDescriptorGroup =
+ nonOffloadedShuffleDescriptorGroup.serializedValue.deserializeValue(
+ getClass().getClassLoader());
+ putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
+ }
+
private void putOrReplaceShuffleDescriptors(ShuffleDescriptorGroup shuffleDescriptorGroup) {
for (ShuffleDescriptorAndIndex shuffleDescriptorAndIndex :
shuffleDescriptorGroup.getShuffleDescriptors()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 016105d9aac..5684066735f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -80,25 +80,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
}
}
- /**
- * The raw value that is not offloaded to the {@link org.apache.flink.runtime.blob.BlobServer}.
- *
- * @param <T> type of the raw value
- */
- public static class NonOffloadedRaw<T> extends MaybeOffloaded<T> {
- private static final long serialVersionUID = 1L;
-
- /** The raw value. */
- public T value;
-
- @SuppressWarnings("unused")
- public NonOffloadedRaw() {}
-
- public NonOffloadedRaw(T value) {
- this.value = Preconditions.checkNotNull(value);
- }
- }
-
/**
* Reference to a serialized value that was offloaded to the {@link
* org.apache.flink.runtime.blob.BlobServer}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index d6a2b16010b..8b0498159a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -47,14 +47,11 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
-import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -452,7 +449,7 @@ public class TaskDeploymentDescriptorFactory {
public static class ShuffleDescriptorGroup implements Serializable {
private static final long serialVersionUID = 1L;
- private ShuffleDescriptorAndIndex[] shuffleDescriptors;
+ private final ShuffleDescriptorAndIndex[] shuffleDescriptors;
public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[] shuffleDescriptors) {
this.shuffleDescriptors = checkNotNull(shuffleDescriptors);
@@ -461,31 +458,19 @@ public class TaskDeploymentDescriptorFactory {
public ShuffleDescriptorAndIndex[] getShuffleDescriptors() {
return shuffleDescriptors;
}
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- byte[] bytes = InstantiationUtil.serializeObjectAndCompress(shuffleDescriptors);
- oos.writeObject(bytes);
- }
-
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- byte[] bytes = (byte[]) ois.readObject();
- shuffleDescriptors =
- InstantiationUtil.decompressAndDeserializeObject(
- bytes, ClassLoader.getSystemClassLoader());
- }
}
- /** Offload shuffle descriptors. */
+ /** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
/**
- * Try to serialize and offload shuffle descriptors.
+ * Serialize and try offload shuffle descriptors.
*
- * @param shuffleDescriptorGroup to serialize and offload
+ * @param shuffleDescriptorGroup to serialize
* @param numConsumer consumers number of these shuffle descriptors, it means how many times
* serialized shuffle descriptor should be sent
- * @return offloaded serialized or non-offloaded raw shuffle descriptors
+ * @return offloaded or non-offloaded serialized shuffle descriptors
*/
- MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
+ MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException;
}
@@ -502,24 +487,25 @@ public class TaskDeploymentDescriptorFactory {
}
@Override
- public MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
+ public MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException {
- final Either<ShuffleDescriptorGroup, PermanentBlobKey> rawValueOrBlobKey =
- shouldOffload(shuffleDescriptorGroup.getShuffleDescriptors(), numConsumer)
- ? BlobWriter.offloadWithException(
- CompressedSerializedValue.fromObject(
- shuffleDescriptorGroup),
- jobID,
- blobWriter)
- .map(Either::<ShuffleDescriptorGroup, PermanentBlobKey>Right)
- .orElse(Either.Left(shuffleDescriptorGroup))
- : Either.Left(shuffleDescriptorGroup);
-
- if (rawValueOrBlobKey.isLeft()) {
- return new TaskDeploymentDescriptor.NonOffloadedRaw<>(rawValueOrBlobKey.left());
+ final CompressedSerializedValue<ShuffleDescriptorGroup> compressedSerializedValue =
+ CompressedSerializedValue.fromObject(shuffleDescriptorGroup);
+
+ final Either<SerializedValue<ShuffleDescriptorGroup>, PermanentBlobKey>
+ serializedValueOrBlobKey =
+ shouldOffload(
+ shuffleDescriptorGroup.getShuffleDescriptors(),
+ numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID, blobWriter)
+ : Either.Left(compressedSerializedValue);
+
+ if (serializedValueOrBlobKey.isLeft()) {
+ return new TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
} else {
- return new TaskDeploymentDescriptor.Offloaded<>(rawValueOrBlobKey.right());
+ return new TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
index 0160d180e23..f9cd00e103b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.deployment;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -39,10 +39,12 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.CompressedSerializedValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -88,7 +90,7 @@ class CachedShuffleDescriptorsTest {
assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).hasSize(1);
MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloadedShuffleDescriptor =
cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups().get(0);
- assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ assertNonOffloadedShuffleDescriptorAndIndexEquals(
maybeOffloadedShuffleDescriptor,
Collections.singletonList(shuffleDescriptor),
Collections.singletonList(0));
@@ -142,22 +144,26 @@ class CachedShuffleDescriptorsTest {
intermediateResultPartition2,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
false);
- assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ assertNonOffloadedShuffleDescriptorAndIndexEquals(
maybeOffloaded,
Arrays.asList(expectedShuffleDescriptor1, expectedShuffleDescriptor2),
Arrays.asList(0, 1));
}
- private void assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ private void assertNonOffloadedShuffleDescriptorAndIndexEquals(
MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloaded,
List<ShuffleDescriptor> expectedDescriptors,
- List<Integer> expectedIndices) {
+ List<Integer> expectedIndices)
+ throws Exception {
assertThat(expectedDescriptors).hasSameSizeAs(expectedIndices);
- assertThat(maybeOffloaded).isInstanceOf(NonOffloadedRaw.class);
- NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRaw =
- (NonOffloadedRaw<ShuffleDescriptorGroup>) maybeOffloaded;
+ assertThat(maybeOffloaded).isInstanceOf(NonOffloaded.class);
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloaded =
+ (NonOffloaded<ShuffleDescriptorGroup>) maybeOffloaded;
ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndices =
- nonOffloadedRaw.value.getShuffleDescriptors();
+ nonOffloaded
+ .serializedValue
+ .deserializeValue(getClass().getClassLoader())
+ .getShuffleDescriptors();
assertThat(shuffleDescriptorAndIndices).hasSameSizeAs(expectedDescriptors);
for (int i = 0; i < shuffleDescriptorAndIndices.length; i++) {
assertThat(shuffleDescriptorAndIndices[i].getIndex()).isEqualTo(expectedIndices.get(i));
@@ -212,9 +218,9 @@ class CachedShuffleDescriptorsTest {
implements TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer {
@Override
- public MaybeOffloaded<ShuffleDescriptorGroup> trySerializeAndOffloadShuffleDescriptor(
- ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) {
- return new NonOffloadedRaw<>(shuffleDescriptorGroup);
+ public MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException {
+ return new NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptorGroup));
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
index 04683d4489c..19fbefe2920 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -47,8 +47,11 @@ public class TaskDeploymentDescriptorTestUtils {
int maxIndex = 0;
for (MaybeOffloaded<ShuffleDescriptorGroup> sd : maybeOffloaded) {
ShuffleDescriptorGroup shuffleDescriptorGroup;
- if (sd instanceof NonOffloadedRaw) {
- shuffleDescriptorGroup = ((NonOffloadedRaw<ShuffleDescriptorGroup>) sd).value;
+ if (sd instanceof NonOffloaded) {
+ shuffleDescriptorGroup =
+ ((NonOffloaded<ShuffleDescriptorGroup>) sd)
+ .serializedValue.deserializeValue(
+ ClassLoader.getSystemClassLoader());
} else {
final CompressedSerializedValue<ShuffleDescriptorGroup> compressedSerializedValue =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 071a23ad34b..0ae74209507 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
+import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
@@ -1312,8 +1313,9 @@ class SingleInputGateTest extends InputGateTestBase {
subpartitionIndexRange,
channelDescs.length,
Collections.singletonList(
- new TaskDeploymentDescriptor.NonOffloadedRaw<>(
- new ShuffleDescriptorGroup(channelDescs))));
+ new TaskDeploymentDescriptor.NonOffloaded<>(
+ CompressedSerializedValue.fromObject(
+ new ShuffleDescriptorGroup(channelDescs)))));
final TaskMetricGroup taskMetricGroup =
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();