You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/14 23:47:22 UTC
[flink] branch master updated: [FLINK-14807][rest] Introduce REST
API for communication between clients and operator coordinators
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe920f [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators
9fe920f is described below
commit 9fe920fdc6f7eeaf2d99901099c842cfd0f1380a
Author: TsReaper <ts...@gmail.com>
AuthorDate: Fri May 15 07:46:31 2020 +0800
[FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators
This closes #12037
---
.../deployment/ClusterClientJobClientAdapter.java | 14 ++-
.../deployment/application/EmbeddedJobClient.java | 20 +++-
.../apache/flink/client/program/ClusterClient.java | 13 +++
.../flink/client/program/MiniClusterClient.java | 20 ++++
.../client/program/PerJobMiniClusterFactory.java | 19 +++-
.../client/program/rest/RestClusterClient.java | 37 ++++++++
.../flink/client/program/TestingClusterClient.java | 11 +++
.../client/program/rest/RestClusterClientTest.java | 75 +++++++++++++++
.../src/test/resources/rest_api_v1.snapshot | 33 +++++++
.../flink/runtime/dispatcher/Dispatcher.java | 17 ++++
.../apache/flink/runtime/jobmaster/JobMaster.java | 15 +++
.../flink/runtime/jobmaster/JobMasterGateway.java | 19 ++++
.../flink/runtime/minicluster/MiniCluster.java | 14 +++
.../coordination/CoordinationRequest.java | 27 ++++++
.../coordination/CoordinationRequestGateway.java | 39 ++++++++
.../coordination/CoordinationRequestHandler.java | 36 +++++++
.../coordination/CoordinationResponse.java | 27 ++++++
.../coordination/ClientCoordinationHandler.java | 84 +++++++++++++++++
.../rest/messages/OperatorIDPathParameter.java | 49 ++++++++++
.../coordination/ClientCoordinationHeaders.java | 80 ++++++++++++++++
.../ClientCoordinationMessageParameters.java | 49 ++++++++++
.../ClientCoordinationRequestBody.java | 56 +++++++++++
.../ClientCoordinationResponseBody.java | 56 +++++++++++
.../flink/runtime/scheduler/SchedulerBase.java | 36 ++++++-
.../flink/runtime/scheduler/SchedulerNG.java | 12 +++
.../flink/runtime/webmonitor/RestfulGateway.java | 24 +++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 9 ++
.../jobmaster/utils/TestingJobMasterGateway.java | 14 ++-
.../utils/TestingJobMasterGatewayBuilder.java | 11 ++-
.../OperatorCoordinatorSchedulerTest.java | 48 ++++++++++
.../TestingCoordinationRequestHandler.java | 104 +++++++++++++++++++++
.../webmonitor/TestingDispatcherGateway.java | 14 ++-
.../runtime/webmonitor/TestingRestfulGateway.java | 34 ++++++-
.../environment/RemoteStreamEnvironmentTest.java | 11 +++
34 files changed, 1112 insertions(+), 15 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 3fb0e5c..94c31f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,6 +26,10 @@ import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.commons.io.IOUtils;
@@ -41,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the {@link JobClient} interface that uses a {@link ClusterClient} underneath..
*/
-public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
+public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {
private final ClusterClientProvider<ClusterID> clusterClientProvider;
@@ -115,6 +119,13 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
})));
}
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+ return bridgeClientRequest(
+ clusterClientProvider,
+ clusterClient -> clusterClient.sendCoordinationRequest(jobID, operatorId, request));
+ }
+
private static <T> CompletableFuture<T> bridgeClientRequest(
ClusterClientProvider<?> clusterClientProvider,
Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {
@@ -132,5 +143,4 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
return resultFuture.whenCompleteAsync(
(jobResult, throwable) -> IOUtils.closeQuietly(clusterClient::close));
}
-
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
index 617ca08..e72c467 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
@@ -25,12 +25,19 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -42,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* uses directly the {@link DispatcherGateway}.
*/
@Internal
-public class EmbeddedJobClient implements JobClient {
+public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
private final JobID jobId;
@@ -119,4 +126,15 @@ public class EmbeddedJobClient implements JobClient {
}
});
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+ try {
+ SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+ return dispatcherGateway.deliverCoordinationRequestToCoordinator(
+ jobId, operatorId, serializedRequest, timeout);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 952e428..e6843dc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable;
@@ -162,4 +165,14 @@ public interface ClusterClient<T> extends AutoCloseable {
* @return path future where the savepoint is located
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
+
+ /**
+ * Sends out a request to a specified coordinator and return the response.
+ *
+ * @param jobId specifies the job which the coordinator belongs to
+ * @param operatorId specifies which coordinator to receive the request
+ * @param request the request to send
+ * @return the response from the coordinator
+ */
+ CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, OperatorID operatorId, CoordinationRequest request);
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 2eeb1f4..4c81681 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -24,12 +24,17 @@ import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -155,6 +161,20 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
}
}
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+ JobID jobId,
+ OperatorID operatorId,
+ CoordinationRequest request) {
+ try {
+ SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+ return miniCluster.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest);
+ } catch (IOException e) {
+ LOG.error("Error while sending coordination request", e);
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
/**
* The type of the Cluster ID for the local {@link MiniCluster}.
*/
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index f8998a2..16ca6b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -26,18 +26,25 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -127,7 +134,7 @@ public final class PerJobMiniClusterFactory {
/**
* A {@link JobClient} for a {@link PerJobMiniClusterFactory}.
*/
- private static final class PerJobMiniClusterJobClient implements JobClient {
+ private static final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway {
private final JobID jobID;
private final MiniCluster miniCluster;
@@ -182,5 +189,15 @@ public final class PerJobMiniClusterFactory {
}
});
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+ try {
+ SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+ return miniCluster.deliverCoordinationRequestToCoordinator(jobID, operatorId, serializedRequest);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index f2c6aa1..0b6601c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -37,9 +37,12 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
@@ -67,6 +70,9 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -88,6 +94,7 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -411,6 +418,36 @@ public class RestClusterClient<T> implements ClusterClient<T> {
return triggerSavepoint(jobId, savepointDirectory, false);
}
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+ JobID jobId,
+ OperatorID operatorId,
+ CoordinationRequest request) {
+ ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
+ ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
+ params.jobPathParameter.resolve(jobId);
+ params.operatorPathParameter.resolve(operatorId);
+
+ SerializedValue<CoordinationRequest> serializedRequest;
+ try {
+ serializedRequest = new SerializedValue<>(request);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+
+ ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
+ return sendRequest(headers, params, requestBody).thenApply(
+ responseBody -> {
+ try {
+ return responseBody
+ .getSerializedCoordinationResponse()
+ .deserializeValue(getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new CompletionException("Failed to deserialize coordination response", e);
+ }
+ });
+ }
+
private CompletableFuture<String> triggerSavepoint(
final JobID jobId,
final @Nullable String savepointDirectory,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 46df5ea..25124a0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.function.TriFunction;
import javax.annotation.Nonnull;
@@ -133,6 +136,14 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
}
@Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+ JobID jobId,
+ OperatorID operatorId,
+ CoordinationRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index ff1f5ff..a55b9e7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -39,10 +39,13 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
@@ -76,6 +79,10 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -701,6 +708,74 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ @Test
+ public void testSendCoordinationRequest() throws Exception {
+ final TestClientCoordinationHandler handler = new TestClientCoordinationHandler();
+
+ try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) {
+ RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+ String payload = "testing payload";
+ TestCoordinationRequest<String> request = new TestCoordinationRequest<>(payload);
+ try {
+ CompletableFuture<CoordinationResponse> future =
+ restClusterClient.sendCoordinationRequest(jobId, new OperatorID(), request);
+ TestCoordinationResponse response = (TestCoordinationResponse) future.get();
+
+ assertEquals(payload, response.payload);
+ } finally {
+ restClusterClient.close();
+ }
+ }
+ }
+
+ private class TestClientCoordinationHandler extends TestHandler<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+ private TestClientCoordinationHandler() {
+ super(ClientCoordinationHeaders.getInstance());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ try {
+ TestCoordinationRequest req =
+ (TestCoordinationRequest) request
+ .getRequestBody()
+ .getSerializedCoordinationRequest()
+ .deserializeValue(getClass().getClassLoader());
+ TestCoordinationResponse resp = new TestCoordinationResponse(req.payload);
+ return CompletableFuture.completedFuture(
+ new ClientCoordinationResponseBody(
+ new SerializedValue<>(resp)));
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+ }
+
+ private static class TestCoordinationRequest<T> implements CoordinationRequest {
+
+ private static final long serialVersionUID = 1L;
+
+ private final T payload;
+
+ private TestCoordinationRequest(T payload) {
+ this.payload = payload;
+ }
+ }
+
+ private static class TestCoordinationResponse<T> implements CoordinationResponse {
+
+ private static final long serialVersionUID = 1L;
+
+ private final T payload;
+
+ private TestCoordinationResponse(T payload) {
+ this.payload = payload;
+ }
+ }
+
private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
public TestAccumulatorHandler() {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index af55a7a..b14b741 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1430,6 +1430,39 @@
"type" : "any"
}
}, {
+ "url" : "/jobs/:jobid/coordinators/:operatorid",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ }, {
+ "key" : "operatorid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationRequestBody",
+ "properties" : {
+ "serializedCoordinationRequest" : {
+ "type" : "any"
+ }
+ }
+ },
+ "response" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationResponseBody",
+ "properties" : {
+ "serializedCoordinationResult" : {
+ "type" : "any"
+ }
+ }
+ }
+ }, {
"url" : "/jobs/:jobid/exceptions",
"method" : "GET",
"status-code" : "200 OK",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 05aab6e..dde3908 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl;
@@ -55,6 +56,8 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -66,6 +69,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
@@ -625,6 +629,19 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
return CompletableFuture.completedFuture(Acknowledge.get());
}
+ @Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ JobID jobId,
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest,
+ Time timeout) {
+ final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+
+ return jobMasterGatewayFuture.thenCompose(
+ (JobMasterGateway jobMasterGateway) ->
+ jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
+ }
+
/**
* Cleans up the job related data from the dispatcher. If cleanupHA is true, then
* the data will also be removed from HA.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 543ef12..d5087cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -62,6 +62,8 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -718,6 +720,19 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator));
}
+ @Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest,
+ Time timeout) {
+ try {
+ CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
+ return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 1006dad..16decc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -33,8 +33,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -44,6 +47,7 @@ import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
@@ -271,4 +275,19 @@ public interface JobMasterGateway extends
* @return The updated aggregate
*/
CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregationFunction);
+
+ /**
+ * Deliver a coordination request to a specified coordinator and return the response.
+ *
+ * @param operatorId identifying the coordinator to receive the request
+ * @param serializedRequest serialized request to deliver
+ * @return A future containing the response.
+ * The response will fail with a {@link org.apache.flink.util.FlinkException}
+ * if the task is not running, or no operator/coordinator exists for the given ID,
+ * or the coordinator cannot handle client events.
+ */
+ CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest,
+ @RpcTimeout Time timeout);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 556af4a..58e08ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -60,6 +61,8 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -79,6 +82,7 @@ import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
@@ -601,6 +605,16 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJob(jobId, rpcTimeout));
}
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ JobID jobId,
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest) {
+ return runDispatcherCommand(
+ dispatcherGateway ->
+ dispatcherGateway.deliverCoordinationRequestToCoordinator(
+ jobId, operatorId, serializedRequest, rpcTimeout));
+ }
+
private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) {
return getDispatcherGatewayFuture().thenApply(dispatcherCommand).thenCompose(Function.identity());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
new file mode 100644
index 0000000..a02678d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all requests from the client to a {@link OperatorCoordinator}
+ * which requests for a {@link CoordinationResponse}.
+ */
+public interface CoordinationRequest extends Serializable {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
new file mode 100644
index 0000000..0767569
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface which sends out a {@link CoordinationRequest} and
+ * expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}.
+ */
+public interface CoordinationRequestGateway {
+
+ /**
+ * Send out a request to a specified coordinator and return the response.
+ *
+ * @param operatorId specifies which coordinator to receive the request
+ * @param request the request to send
+ * @return the response from the coordinator
+ */
+ CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
new file mode 100644
index 0000000..3514d1e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Coordinator interface which can handle {@link CoordinationRequest}s
+ * and response with {@link CoordinationResponse}s to the client.
+ */
+public interface CoordinationRequestHandler {
+
+ /**
+ * Called when receiving a request from the client.
+ *
+ * @param request the request received
+ * @return a future containing the response from the coordinator for this request
+ */
+ CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
new file mode 100644
index 0000000..d28e30b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all responses from a {@link OperatorCoordinator} to the client
+ * which is the response for a {@link CoordinationRequest}.
+ */
+public interface CoordinationResponse extends Serializable {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
new file mode 100644
index 0000000..e361f64
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.coordination;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Handler that receives the coordination requests from the client and returns the response from the coordinator.
+ */
+public class ClientCoordinationHandler extends AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+ public ClientCoordinationHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> messageHeaders) {
+ super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(
+ @Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request,
+ @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ OperatorID operatorId = request.getPathParameter(OperatorIDPathParameter.class);
+ SerializedValue<CoordinationRequest> serializedRequest =
+ request.getRequestBody().getSerializedCoordinationRequest();
+ CompletableFuture<CoordinationResponse> responseFuture =
+ gateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, timeout);
+ return responseFuture.thenApply(
+ coordinationResponse -> {
+ try {
+ return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));
+ } catch (IOException e) {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Failed to serialize coordination response",
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
+ }
+ });
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
new file mode 100644
index 0000000..87d985c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * Path parameter identifying operators.
+ */
+public class OperatorIDPathParameter extends MessagePathParameter<OperatorID> {
+
+ public static final String KEY = "operatorid";
+
+ public OperatorIDPathParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected OperatorID convertFromString(String value) throws ConversionException {
+ return new OperatorID(StringUtils.hexStringToByte(value));
+ }
+
+ @Override
+ protected String convertToString(OperatorID value) {
+ return value.toString();
+ }
+
+ @Override
+ public String getDescription() {
+ return "string value that identifies an operator.";
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
new file mode 100644
index 0000000..aded1e3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link ClientCoordinationHandler}.
+ */
+@Documentation.ExcludeFromDocumentation(
+ "This API is not exposed to the users, as coordinators are used only internally.")
+public class ClientCoordinationHeaders implements MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+ public static final String URL = "/jobs/:jobid/coordinators/:operatorid";
+
+ private static final ClientCoordinationHeaders INSTANCE = new ClientCoordinationHeaders();
+
+ private ClientCoordinationHeaders() {}
+
+ @Override
+ public Class<ClientCoordinationRequestBody> getRequestClass() {
+ return ClientCoordinationRequestBody.class;
+ }
+
+ @Override
+ public Class<ClientCoordinationResponseBody> getResponseClass() {
+ return ClientCoordinationResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public ClientCoordinationMessageParameters getUnresolvedMessageParameters() {
+ return new ClientCoordinationMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static ClientCoordinationHeaders getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Send a request to a specified coordinator of the specified job and get the response. " +
+ "This API is for internal use only.";
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
new file mode 100644
index 0000000..01be123
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link ClientCoordinationHandler}.
+ */
+public class ClientCoordinationMessageParameters extends MessageParameters {
+
+ public final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+ public final OperatorIDPathParameter operatorPathParameter = new OperatorIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Arrays.asList(jobPathParameter, operatorPathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.emptyList();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
new file mode 100644
index 0000000..e663320
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Request that carries a serialized {@link CoordinationRequest} to a specified coordinator.
+ */
+public class ClientCoordinationRequestBody implements RequestBody {
+
+ public static final String FIELD_NAME_SERIALIZED_COORDINATION_REQUEST = "serializedCoordinationRequest";
+
+ @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+ @JsonSerialize(using = SerializedValueSerializer.class)
+ @JsonDeserialize(using = SerializedValueDeserializer.class)
+ private final SerializedValue<CoordinationRequest> serializedCoordinationRequest;
+
+ @JsonCreator
+ public ClientCoordinationRequestBody(
+ @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+ SerializedValue<CoordinationRequest> serializedCoordinationRequest) {
+ this.serializedCoordinationRequest = serializedCoordinationRequest;
+ }
+
+ @JsonIgnore
+ public SerializedValue<CoordinationRequest> getSerializedCoordinationRequest() {
+ return serializedCoordinationRequest;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
new file mode 100644
index 0000000..5d1cdf2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Response that carries a serialized {@link CoordinationResponse} to the client.
+ */
+public class ClientCoordinationResponseBody implements ResponseBody {
+
+ public static final String FIELD_NAME_SERIALIZED_COORDINATION_RESULT = "serializedCoordinationResult";
+
+ @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+ @JsonSerialize(using = SerializedValueSerializer.class)
+ @JsonDeserialize(using = SerializedValueDeserializer.class)
+ private final SerializedValue<CoordinationResponse> serializedCoordinationResponse;
+
+ @JsonCreator
+ public ClientCoordinationResponseBody(
+ @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+ SerializedValue<CoordinationResponse> serializedCoordinationResponse) {
+ this.serializedCoordinationResponse = serializedCoordinationResponse;
+ }
+
+ @JsonIgnore
+ public SerializedValue<CoordinationResponse> getSerializedCoordinationResponse() {
+ return serializedCoordinationResponse;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 2be0e5c..7cc17dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -76,6 +76,9 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
@@ -107,6 +110,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -164,6 +168,8 @@ public abstract class SchedulerBase implements SchedulerNG {
protected final ExecutionVertexVersioner executionVertexVersioner;
+ private final Map<OperatorID, OperatorCoordinator> coordinatorMap;
+
private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
"SchedulerBase is not initialized with proper main thread executor. " +
"Call to SchedulerBase.setMainThreadExecutor(...) required.");
@@ -222,6 +228,8 @@ public abstract class SchedulerBase implements SchedulerNG {
this.schedulingTopology = executionGraph.getSchedulingTopology();
this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+ this.coordinatorMap = createCoordinatorMap();
}
private ExecutionGraph createAndRestoreExecutionGraph(
@@ -932,6 +940,20 @@ public abstract class SchedulerBase implements SchedulerNG {
}
}
+ @Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ OperatorID operator,
+ CoordinationRequest request) throws FlinkException {
+ OperatorCoordinator coordinator = coordinatorMap.get(operator);
+ if (coordinator instanceof CoordinationRequestHandler) {
+ return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
+ } else if (coordinator != null) {
+ throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
+ } else {
+ throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+ }
+ }
+
private void startAllOperatorCoordinators() {
final Collection<OperatorCoordinator> coordinators = getAllCoordinators();
try {
@@ -951,8 +973,16 @@ public abstract class SchedulerBase implements SchedulerNG {
}
private Collection<OperatorCoordinator> getAllCoordinators() {
- return getExecutionGraph().getAllVertices().values().stream()
- .flatMap((vertex) -> vertex.getOperatorCoordinators().stream())
- .collect(Collectors.toList());
+ return coordinatorMap.values();
+ }
+
+ private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() {
+ Map<OperatorID, OperatorCoordinator> coordinatorMap = new HashMap<>();
+ for (ExecutionJobVertex vertex : getExecutionGraph().getAllVertices().values()) {
+ for (Map.Entry<OperatorID, OperatorCoordinator> entry : vertex.getOperatorCoordinatorMap().entrySet()) {
+ coordinatorMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return coordinatorMap;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5a75d37..07f7e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -40,6 +40,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
@@ -138,4 +140,14 @@ public interface SchedulerNG {
* for the given ID.
*/
void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException;
+
+ /**
+ * Delivers a coordination request to the {@link OperatorCoordinator} with the given {@link OperatorID}
+ * and returns the coordinator's response.
+ *
+ * @return A future containing the response.
+ * @throws FlinkException Thrown, if the task is not running, or no operator/coordinator exists
+ * for the given ID, or the coordinator cannot handle client events.
+ */
+ CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index be0e45a..c736565 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -26,15 +26,19 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -186,4 +190,24 @@ public interface RestfulGateway extends RpcGateway {
default CompletableFuture<Acknowledge> shutDownCluster() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Deliver a coordination request to a specified coordinator and return the response.
+ *
+ * @param jobId identifying the job which the coordinator belongs to
+ * @param operatorId identifying the coordinator to receive the request
+ * @param serializedRequest serialized request to deliver
+ * @param timeout RPC timeout
+ * @return A future containing the response.
+ * The response will fail with a {@link org.apache.flink.util.FlinkException}
+ * if the task is not running, or no operator/coordinator exists for the given ID,
+ * or the coordinator cannot handle client events.
+ */
+ default CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ JobID jobId,
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest,
+ @RpcTimeout Time timeout) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index c17b2b7..fd67f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
@@ -119,6 +120,7 @@ import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -559,6 +561,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
final ClusterDataSetDeleteHandlers.ClusterDataSetDeleteStatusHandler clusterDataSetDeleteStatusHandler =
clusterDataSetDeleteHandlers.new ClusterDataSetDeleteStatusHandler(leaderRetriever, timeout, responseHeaders);
+ final ClientCoordinationHandler clientCoordinationHandler = new ClientCoordinationHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ ClientCoordinationHeaders.getInstance());
+
final ShutdownHandler shutdownHandler = new ShutdownHandler(
leaderRetriever,
timeout,
@@ -625,6 +633,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(clusterDataSetListHandler.getMessageHeaders(), clusterDataSetListHandler));
handlers.add(Tuple2.of(clusterDataSetDeleteTriggerHandler.getMessageHeaders(), clusterDataSetDeleteTriggerHandler));
handlers.add(Tuple2.of(clusterDataSetDeleteStatusHandler.getMessageHeaders(), clusterDataSetDeleteStatusHandler));
+ handlers.add(Tuple2.of(clientCoordinationHandler.getMessageHeaders(), clientCoordinationHandler));
// TODO: Remove once the Yarn proxy can forward all REST verbs
handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), yarnJobCancelTerminationHandler));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 47b9206..59b3cc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -157,6 +159,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
@Nonnull
private final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender;
+ @Nonnull
+ private final BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction;
+
public TestingJobMasterGateway(
@Nonnull String address,
@Nonnull String hostname,
@@ -185,7 +190,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
@Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction,
@Nonnull Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction,
@Nonnull TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction,
- @Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender) {
+ @Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender,
+ @Nonnull BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) {
this.address = address;
this.hostname = hostname;
this.cancelFunction = cancelFunction;
@@ -214,6 +220,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
this.notifyKvStateUnregisteredFunction = notifyKvStateUnregisteredFunction;
this.updateAggregateFunction = updateAggregateFunction;
this.operatorEventSender = operatorEventSender;
+ this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction;
}
@Override
@@ -360,4 +367,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event) {
return operatorEventSender.apply(task, operatorID, event);
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, Time timeout) {
+ return deliverCoordinationRequestFunction.apply(operatorId, serializedRequest);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index a960448..20202cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -102,6 +104,7 @@ public class TestingJobMasterGatewayBuilder {
private Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
private TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction = (a, b, c) -> CompletableFuture.completedFuture(new Object());
private TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get());
+ private BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction = (a, b) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
public TestingJobMasterGatewayBuilder setAddress(String address) {
this.address = address;
@@ -243,6 +246,11 @@ public class TestingJobMasterGatewayBuilder {
return this;
}
+ public TestingJobMasterGatewayBuilder setDeliverCoordinationRequestFunction(BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) {
+ this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction;
+ return this;
+ }
+
public TestingJobMasterGateway build() {
return new TestingJobMasterGateway(
address,
@@ -272,6 +280,7 @@ public class TestingJobMasterGatewayBuilder {
notifyKvStateRegisteredFunction,
notifyKvStateUnregisteredFunction,
updateAggregateFunction,
- operatorEventSender);
+ operatorEventSender,
+ deliverCoordinationRequestFunction);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 17a82f0..8ecf172 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -163,6 +165,52 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
assertThat(result, futureFailedWith(TestException.class));
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDeliveringClientRequestToRequestHandler() throws Exception {
+ final OperatorCoordinator.Provider provider = new TestingCoordinationRequestHandler.Provider(testOperatorId);
+ final DefaultScheduler scheduler = createScheduler(provider);
+
+ final String payload = "testing payload";
+ final TestingCoordinationRequestHandler.Request<String> request =
+ new TestingCoordinationRequestHandler.Request<>(payload);
+ final TestingCoordinationRequestHandler.Response<String> response =
+ (TestingCoordinationRequestHandler.Response<String>)
+ scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get();
+
+ assertEquals(payload, response.getPayload());
+ }
+
+ @Test
+ public void testDeliveringClientRequestToNonRequestHandler() throws Exception {
+ final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId);
+ final DefaultScheduler scheduler = createScheduler(provider);
+
+ final String payload = "testing payload";
+ final TestingCoordinationRequestHandler.Request<String> request =
+ new TestingCoordinationRequestHandler.Request<>(payload);
+
+ CommonTestUtils.assertThrows(
+ "cannot handle client event",
+ FlinkException.class,
+ () -> scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request));
+ }
+
+ @Test
+ public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {
+ final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId);
+ final DefaultScheduler scheduler = createScheduler(provider);
+
+ final String payload = "testing payload";
+ final TestingCoordinationRequestHandler.Request<String> request =
+ new TestingCoordinationRequestHandler.Request<>(payload);
+
+ CommonTestUtils.assertThrows(
+ "does not exist",
+ FlinkException.class,
+ () -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request));
+ }
+
// ------------------------------------------------------------------------
// test setups
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
new file mode 100644
index 0000000..4aa3c43
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A simple testing implementation of the {@link CoordinationRequestHandler}.
+ */
+public class TestingCoordinationRequestHandler extends TestingOperatorCoordinator implements CoordinationRequestHandler {
+
+ public TestingCoordinationRequestHandler(Context context) {
+ super(context);
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
+ Request req = (Request) request;
+ return CompletableFuture.completedFuture(new Response<>(req.getPayload()));
+ }
+
+ /**
+ * A testing stub for an {@link OperatorCoordinator.Provider} that creates a
+ * {@link TestingCoordinationRequestHandler}.
+ */
+ public static final class Provider implements OperatorCoordinator.Provider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final OperatorID operatorId;
+
+ public Provider(OperatorID operatorId) {
+ this.operatorId = operatorId;
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorId;
+ }
+
+ @Override
+ public OperatorCoordinator create(Context context) {
+ return new TestingCoordinationRequestHandler(context);
+ }
+ }
+
+ /**
+ * A {@link CoordinationRequest} that a {@link TestingCoordinationRequestHandler} receives.
+ *
+ * @param <T> payload type
+ */
+ public static class Request<T> implements CoordinationRequest {
+
+ private static final long serialVersionUID = 1L;
+
+ private final T payload;
+
+ public Request(T payload) {
+ this.payload = payload;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+ }
+
+ /**
+ * A {@link CoordinationResponse} that a {@link TestingCoordinationRequestHandler} gives.
+ *
+ * @param <T> payload type
+ */
+ public static class Response<T> implements CoordinationResponse {
+
+ private static final long serialVersionUID = 1L;
+
+ private final T payload;
+
+ public Response(T payload) {
+ this.payload = payload;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 4afd56d..becf513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -29,12 +29,17 @@ import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
import java.util.Collection;
import java.util.Collections;
@@ -92,7 +97,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
DispatcherId fencingToken,
Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction,
Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier,
- Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction) {
+ Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction,
+ TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
super(
address,
hostname,
@@ -107,7 +113,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
requestOperatorBackPressureStatsFunction,
triggerSavepointFunction,
stopWithSavepointFunction,
- clusterShutdownSupplier);
+ clusterShutdownSupplier,
+ deliverCoordinationRequestToCoordinatorFunction);
this.submitFunction = submitFunction;
this.listFunction = listFunction;
this.blobServerPort = blobServerPort;
@@ -219,7 +226,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
fencingToken,
requestArchivedJobFunction,
clusterShutdownSupplier,
- clusterShutdownWithStatusFunction);
+ clusterShutdownWithStatusFunction,
+ deliverCoordinationRequestToCoordinatorFunction);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b7cf02d..0acc5ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -26,12 +26,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
import java.util.Collection;
import java.util.Collections;
@@ -57,6 +62,7 @@ public class TestingRestfulGateway implements RestfulGateway {
static final BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER = (jobId, jobVertexId) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_TRIGGER_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
+ static final TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION = (JobID jobId, OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
static final String LOCALHOST = "localhost";
protected String address;
@@ -89,6 +95,8 @@ public class TestingRestfulGateway implements RestfulGateway {
protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction;
+ protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction;
+
public TestingRestfulGateway() {
this(
LOCALHOST,
@@ -104,7 +112,8 @@ public class TestingRestfulGateway implements RestfulGateway {
DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER,
DEFAULT_TRIGGER_SAVEPOINT_FUNCTION,
DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION,
- DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER);
+ DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER,
+ DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION);
}
public TestingRestfulGateway(
@@ -121,7 +130,8 @@ public class TestingRestfulGateway implements RestfulGateway {
BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction,
BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction,
- Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier) {
+ Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier,
+ TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
this.address = address;
this.hostname = hostname;
this.cancelJobFunction = cancelJobFunction;
@@ -136,6 +146,7 @@ public class TestingRestfulGateway implements RestfulGateway {
this.triggerSavepointFunction = triggerSavepointFunction;
this.stopWithSavepointFunction = stopWithSavepointFunction;
this.clusterShutdownSupplier = clusterShutdownSupplier;
+ this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction;
}
@Override
@@ -199,6 +210,15 @@ public class TestingRestfulGateway implements RestfulGateway {
}
@Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ JobID jobId,
+ OperatorID operatorId,
+ SerializedValue<CoordinationRequest> serializedRequest,
+ Time timeout) {
+ return deliverCoordinationRequestToCoordinatorFunction.apply(jobId, operatorId, serializedRequest);
+ }
+
+ @Override
public String getAddress() {
return address;
}
@@ -229,6 +249,7 @@ public class TestingRestfulGateway implements RestfulGateway {
protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction;
+ protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction;
protected AbstractBuilder() {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
@@ -243,6 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway {
triggerSavepointFunction = DEFAULT_TRIGGER_SAVEPOINT_FUNCTION;
stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION;
clusterShutdownSupplier = DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER;
+ deliverCoordinationRequestToCoordinatorFunction = DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION;
}
public T setAddress(String address) {
@@ -315,6 +337,11 @@ public class TestingRestfulGateway implements RestfulGateway {
return self();
}
+ public T setDeliverCoordinationRequestToCoordinatorFunction(TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
+ this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction;
+ return self();
+ }
+
protected abstract T self();
public abstract TestingRestfulGateway build();
@@ -346,7 +373,8 @@ public class TestingRestfulGateway implements RestfulGateway {
requestOperatorBackPressureStatsFunction,
triggerSavepointFunction,
stopWithSavepointFunction,
- clusterShutdownSupplier);
+ clusterShutdownSupplier,
+ deliverCoordinationRequestToCoordinatorFunction);
}
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 00bc2b9..e548f5e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -31,10 +31,13 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.JobResult.Builder;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -274,6 +277,14 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
@Nullable String savepointDirectory) {
return null;
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+ JobID jobId,
+ OperatorID operatorId,
+ CoordinationRequest request) {
+ return null;
+ }
}
}