You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/01/24 09:52:30 UTC
[flink] branch release-1.14 updated: [FLINK-25732][coordination] Pass serializable collection
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new e358ac6 [FLINK-25732][coordination] Pass serializable collection
e358ac6 is described below
commit e358ac67d2d619fee0c12bf875c17728224dd2a5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 20 13:40:38 2022 +0100
[FLINK-25732][coordination] Pass serializable collection
---
.../apache/flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a3049dc..121990e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -585,7 +585,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
- return new MultipleJobsDetails(deduplicatedJobs.values());
+ return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values()));
});
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 130be52..68f818c 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -919,6 +920,25 @@ public class DispatcherTest extends AbstractDispatcherTest {
JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
}
+ @Test
+ public void testRequestMultipleJobDetails_isSerializable() throws Exception {
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+ completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+ final MultipleJobsDetails multipleJobsDetails =
+ dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+
+ InstantiationUtil.serializeObject(multipleJobsDetails);
+ }
+
private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus) {
Preconditions.checkArgument(!currentJobStatus.isTerminalState());