You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/01/24 09:52:30 UTC

[flink] branch release-1.14 updated: [FLINK-25732][coordination] Pass serializable collection

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new e358ac6  [FLINK-25732][coordination] Pass serializable collection
e358ac6 is described below

commit e358ac67d2d619fee0c12bf875c17728224dd2a5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 20 13:40:38 2022 +0100

    [FLINK-25732][coordination] Pass serializable collection
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java  |  2 +-
 .../flink/runtime/dispatcher/DispatcherTest.java     | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a3049dc..121990e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -585,7 +585,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                     completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
                     runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
 
-                    return new MultipleJobsDetails(deduplicatedJobs.values());
+                    return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values()));
                 });
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 130be52..68f818c 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -919,6 +920,25 @@ public class DispatcherTest extends AbstractDispatcherTest {
                 JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
     }
 
+    @Test
+    public void testRequestMultipleJobDetails_isSerializable() throws Exception {
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
+
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+        final MultipleJobsDetails multipleJobsDetails =
+                dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+
+        InstantiationUtil.serializeObject(multipleJobsDetails);
+    }
+
     private JobManagerRunner runningJobManagerRunnerWithJobStatus(
             final JobStatus currentJobStatus) {
         Preconditions.checkArgument(!currentJobStatus.isTerminalState());