You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/14 23:47:22 UTC

[flink] branch master updated: [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe920f  [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators
9fe920f is described below

commit 9fe920fdc6f7eeaf2d99901099c842cfd0f1380a
Author: TsReaper <ts...@gmail.com>
AuthorDate: Fri May 15 07:46:31 2020 +0800

    [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators
    
    This closes #12037
---
 .../deployment/ClusterClientJobClientAdapter.java  |  14 ++-
 .../deployment/application/EmbeddedJobClient.java  |  20 +++-
 .../apache/flink/client/program/ClusterClient.java |  13 +++
 .../flink/client/program/MiniClusterClient.java    |  20 ++++
 .../client/program/PerJobMiniClusterFactory.java   |  19 +++-
 .../client/program/rest/RestClusterClient.java     |  37 ++++++++
 .../flink/client/program/TestingClusterClient.java |  11 +++
 .../client/program/rest/RestClusterClientTest.java |  75 +++++++++++++++
 .../src/test/resources/rest_api_v1.snapshot        |  33 +++++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  17 ++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  15 +++
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  19 ++++
 .../flink/runtime/minicluster/MiniCluster.java     |  14 +++
 .../coordination/CoordinationRequest.java          |  27 ++++++
 .../coordination/CoordinationRequestGateway.java   |  39 ++++++++
 .../coordination/CoordinationRequestHandler.java   |  36 +++++++
 .../coordination/CoordinationResponse.java         |  27 ++++++
 .../coordination/ClientCoordinationHandler.java    |  84 +++++++++++++++++
 .../rest/messages/OperatorIDPathParameter.java     |  49 ++++++++++
 .../coordination/ClientCoordinationHeaders.java    |  80 ++++++++++++++++
 .../ClientCoordinationMessageParameters.java       |  49 ++++++++++
 .../ClientCoordinationRequestBody.java             |  56 +++++++++++
 .../ClientCoordinationResponseBody.java            |  56 +++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  36 ++++++-
 .../flink/runtime/scheduler/SchedulerNG.java       |  12 +++
 .../flink/runtime/webmonitor/RestfulGateway.java   |  24 +++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   9 ++
 .../jobmaster/utils/TestingJobMasterGateway.java   |  14 ++-
 .../utils/TestingJobMasterGatewayBuilder.java      |  11 ++-
 .../OperatorCoordinatorSchedulerTest.java          |  48 ++++++++++
 .../TestingCoordinationRequestHandler.java         | 104 +++++++++++++++++++++
 .../webmonitor/TestingDispatcherGateway.java       |  14 ++-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  34 ++++++-
 .../environment/RemoteStreamEnvironmentTest.java   |  11 +++
 34 files changed, 1112 insertions(+), 15 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 3fb0e5c..94c31f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,6 +26,10 @@ import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import org.apache.commons.io.IOUtils;
 
@@ -41,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An implementation of the {@link JobClient} interface that uses a {@link ClusterClient} underneath..
  */
-public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
+public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {
 
 	private final ClusterClientProvider<ClusterID> clusterClientProvider;
 
@@ -115,6 +119,13 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
 					})));
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+		return bridgeClientRequest(
+			clusterClientProvider,
+			clusterClient -> clusterClient.sendCoordinationRequest(jobID, operatorId, request));
+	}
+
 	private static <T> CompletableFuture<T> bridgeClientRequest(
 			ClusterClientProvider<?> clusterClientProvider,
 			Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {
@@ -132,5 +143,4 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
 		return resultFuture.whenCompleteAsync(
 				(jobResult, throwable) -> IOUtils.closeQuietly(clusterClient::close));
 	}
-
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
index 617ca08..e72c467 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
@@ -25,12 +25,19 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -42,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * uses directly the {@link DispatcherGateway}.
  */
 @Internal
-public class EmbeddedJobClient implements JobClient {
+public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
 
 	private final JobID jobId;
 
@@ -119,4 +126,15 @@ public class EmbeddedJobClient implements JobClient {
 					}
 				});
 	}
+
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+		try {
+			SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+			return dispatcherGateway.deliverCoordinationRequestToCoordinator(
+				jobId, operatorId, serializedRequest, timeout);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 952e428..e6843dc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -162,4 +165,14 @@ public interface ClusterClient<T> extends AutoCloseable {
 	 * @return path future where the savepoint is located
 	 */
 	CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
+
+	/**
+	 * Sends out a request to a specified coordinator and return the response.
+	 *
+	 * @param jobId specifies the job which the coordinator belongs to
+	 * @param operatorId specifies which coordinator to receive the request
+	 * @param request the request to send
+	 * @return the response from the coordinator
+	 */
+	CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, OperatorID operatorId, CoordinationRequest request);
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 2eeb1f4..4c81681 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -24,12 +24,17 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -155,6 +161,20 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
 		}
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+			JobID jobId,
+			OperatorID operatorId,
+			CoordinationRequest request) {
+		try {
+			SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+			return miniCluster.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest);
+		} catch (IOException e) {
+			LOG.error("Error while sending coordination request", e);
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+
 	/**
 	 * The type of the Cluster ID for the local {@link MiniCluster}.
 	 */
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index f8998a2..16ca6b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -26,18 +26,25 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -127,7 +134,7 @@ public final class PerJobMiniClusterFactory {
 	/**
 	 * A {@link JobClient} for a {@link PerJobMiniClusterFactory}.
 	 */
-	private static final class PerJobMiniClusterJobClient implements JobClient {
+	private static final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway {
 
 		private final JobID jobID;
 		private final MiniCluster miniCluster;
@@ -182,5 +189,15 @@ public final class PerJobMiniClusterFactory {
 				}
 			});
 		}
+
+		@Override
+		public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
+			try {
+				SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
+				return miniCluster.deliverCoordinationRequestToCoordinator(jobID, operatorId, serializedRequest);
+			} catch (IOException e) {
+				return FutureUtils.completedExceptionally(e);
+			}
+		}
 	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index f2c6aa1..0b6601c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -37,9 +37,12 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
@@ -67,6 +70,9 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -88,6 +94,7 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -411,6 +418,36 @@ public class RestClusterClient<T> implements ClusterClient<T> {
 		return triggerSavepoint(jobId, savepointDirectory, false);
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+			JobID jobId,
+			OperatorID operatorId,
+			CoordinationRequest request) {
+		ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
+		ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
+		params.jobPathParameter.resolve(jobId);
+		params.operatorPathParameter.resolve(operatorId);
+
+		SerializedValue<CoordinationRequest> serializedRequest;
+		try {
+			serializedRequest = new SerializedValue<>(request);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
+		return sendRequest(headers, params, requestBody).thenApply(
+			responseBody -> {
+				try {
+					return responseBody
+						.getSerializedCoordinationResponse()
+						.deserializeValue(getClass().getClassLoader());
+				} catch (IOException | ClassNotFoundException e) {
+					throw new CompletionException("Failed to deserialize coordination response", e);
+				}
+			});
+	}
+
 	private CompletableFuture<String> triggerSavepoint(
 			final JobID jobId,
 			final @Nullable String savepointDirectory,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 46df5ea..25124a0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -133,6 +136,14 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
 	}
 
 	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+			JobID jobId,
+			OperatorID operatorId,
+			CoordinationRequest request) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void close() {
 
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index ff1f5ff..a55b9e7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -39,10 +39,13 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.RestClient;
@@ -76,6 +79,10 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
@@ -701,6 +708,74 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testSendCoordinationRequest() throws Exception {
+		final TestClientCoordinationHandler handler = new TestClientCoordinationHandler();
+
+		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) {
+			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+			String payload = "testing payload";
+			TestCoordinationRequest<String> request  = new TestCoordinationRequest<>(payload);
+			try {
+				CompletableFuture<CoordinationResponse> future =
+					restClusterClient.sendCoordinationRequest(jobId, new OperatorID(), request);
+				TestCoordinationResponse response = (TestCoordinationResponse) future.get();
+
+				assertEquals(payload, response.payload);
+			} finally {
+				restClusterClient.close();
+			}
+		}
+	}
+
+	private class TestClientCoordinationHandler extends TestHandler<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+		private TestClientCoordinationHandler() {
+			super(ClientCoordinationHeaders.getInstance());
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			try {
+				TestCoordinationRequest req =
+					(TestCoordinationRequest) request
+						.getRequestBody()
+						.getSerializedCoordinationRequest()
+						.deserializeValue(getClass().getClassLoader());
+				TestCoordinationResponse resp = new TestCoordinationResponse(req.payload);
+				return CompletableFuture.completedFuture(
+					new ClientCoordinationResponseBody(
+						new SerializedValue<>(resp)));
+			} catch (Exception e) {
+				return FutureUtils.completedExceptionally(e);
+			}
+		}
+	}
+
+	private static class TestCoordinationRequest<T> implements CoordinationRequest {
+
+		private static final long serialVersionUID = 1L;
+
+		private final T payload;
+
+		private TestCoordinationRequest(T payload) {
+			this.payload = payload;
+		}
+	}
+
+	private static class TestCoordinationResponse<T> implements CoordinationResponse {
+
+		private static final long serialVersionUID = 1L;
+
+		private final T payload;
+
+		private TestCoordinationResponse(T payload) {
+			this.payload = payload;
+		}
+	}
+
 	private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
 
 		public TestAccumulatorHandler() {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index af55a7a..b14b741 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1430,6 +1430,39 @@
       "type" : "any"
     }
   }, {
+    "url" : "/jobs/:jobid/coordinators/:operatorid",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      }, {
+        "key" : "operatorid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationRequestBody",
+      "properties" : {
+        "serializedCoordinationRequest" : {
+          "type" : "any"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationResponseBody",
+      "properties" : {
+        "serializedCoordinationResult" : {
+          "type" : "any"
+        }
+      }
+    }
+  }, {
     "url" : "/jobs/:jobid/exceptions",
     "method" : "GET",
     "status-code" : "200 OK",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 05aab6e..dde3908 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl;
@@ -55,6 +56,8 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -66,6 +69,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.util.function.FunctionUtils;
@@ -625,6 +629,19 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			JobID jobId,
+			OperatorID operatorId,
+			SerializedValue<CoordinationRequest> serializedRequest,
+			Time timeout) {
+		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+
+		return jobMasterGatewayFuture.thenCompose(
+			(JobMasterGateway jobMasterGateway) ->
+				jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 543ef12..d5087cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -62,6 +62,8 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -718,6 +720,19 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		return CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator));
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			OperatorID operatorId,
+			SerializedValue<CoordinationRequest> serializedRequest,
+			Time timeout) {
+		try {
+			CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
+			return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
+		} catch (Exception e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 1006dad..16decc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -33,8 +33,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -44,6 +47,7 @@ import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
@@ -271,4 +275,19 @@ public interface JobMasterGateway extends
 	 * @return The updated aggregate
 	 */
 	CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregationFunction);
+
+	/**
+	 * Deliver a coordination request to a specified coordinator and return the response.
+	 *
+	 * @param operatorId identifying the coordinator to receive the request
+	 * @param serializedRequest serialized request to deliver
+	 * @return A future containing the response.
+	 *         The response will fail with a {@link org.apache.flink.util.FlinkException}
+	 *         if the task is not running, or no operator/coordinator exists for the given ID,
+	 *         or the coordinator cannot handle client events.
+	 */
+	CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+		OperatorID operatorId,
+		SerializedValue<CoordinationRequest> serializedRequest,
+		@RpcTimeout Time timeout);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 556af4a..58e08ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -60,6 +61,8 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.ReporterSetup;
 import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -79,6 +82,7 @@ import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
@@ -601,6 +605,16 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJob(jobId, rpcTimeout));
 	}
 
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			JobID jobId,
+			OperatorID operatorId,
+			SerializedValue<CoordinationRequest> serializedRequest) {
+		return runDispatcherCommand(
+			dispatcherGateway ->
+				dispatcherGateway.deliverCoordinationRequestToCoordinator(
+					jobId, operatorId, serializedRequest, rpcTimeout));
+	}
+
 	private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) {
 		return getDispatcherGatewayFuture().thenApply(dispatcherCommand).thenCompose(Function.identity());
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
new file mode 100644
index 0000000..a02678d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all requests from the client to a {@link OperatorCoordinator}
+ * which requests for a {@link CoordinationResponse}.
+ */
+public interface CoordinationRequest extends Serializable {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
new file mode 100644
index 0000000..0767569
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface which sends out a {@link CoordinationRequest} and
+ * expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}.
+ */
+public interface CoordinationRequestGateway {
+
+	/**
+	 * Send out a request to a specified coordinator and return the response.
+	 *
+	 * @param operatorId specifies which coordinator to receive the request
+	 * @param request the request to send
+	 * @return the response from the coordinator
+	 */
+	CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
new file mode 100644
index 0000000..3514d1e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Coordinator interface which can handle {@link CoordinationRequest}s
+ * and response with {@link CoordinationResponse}s to the client.
+ */
+public interface CoordinationRequestHandler {
+
+	/**
+	 * Called when receiving a request from the client.
+	 *
+	 * @param request the request received
+	 * @return a future containing the response from the coordinator for this request
+	 */
+	CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
new file mode 100644
index 0000000..d28e30b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all responses from a {@link OperatorCoordinator} to the client
+ * which is the response for a {@link CoordinationRequest}.
+ */
+public interface CoordinationResponse extends Serializable {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
new file mode 100644
index 0000000..e361f64
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.coordination;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Handler that receives the coordination requests from the client and returns the response from the coordinator.
+ */
+public class ClientCoordinationHandler extends AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+	public ClientCoordinationHandler(
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> messageHeaders) {
+		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(
+			@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request,
+			@Nonnull RestfulGateway gateway) throws RestHandlerException {
+		JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		OperatorID operatorId = request.getPathParameter(OperatorIDPathParameter.class);
+		SerializedValue<CoordinationRequest> serializedRequest =
+			request.getRequestBody().getSerializedCoordinationRequest();
+		CompletableFuture<CoordinationResponse> responseFuture =
+			gateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, timeout);
+		return responseFuture.thenApply(
+			coordinationResponse -> {
+				try {
+					return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));
+				} catch (IOException e) {
+					throw new CompletionException(
+						new RestHandlerException(
+							"Failed to serialize coordination response",
+							HttpResponseStatus.INTERNAL_SERVER_ERROR,
+							e));
+				}
+			});
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
new file mode 100644
index 0000000..87d985c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * Path parameter identifying operators.
+ */
+public class OperatorIDPathParameter extends MessagePathParameter<OperatorID> {
+
+	public static final String KEY = "operatorid";
+
+	public OperatorIDPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected OperatorID convertFromString(String value) throws ConversionException {
+		return new OperatorID(StringUtils.hexStringToByte(value));
+	}
+
+	@Override
+	protected String convertToString(OperatorID value) {
+		return value.toString();
+	}
+
+	@Override
+	public String getDescription() {
+		return "string value that identifies an operator.";
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
new file mode 100644
index 0000000..aded1e3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link ClientCoordinationHandler}.
+ */
+@Documentation.ExcludeFromDocumentation(
+	"This API is not exposed to the users, as coordinators are used only internally.")
+public class ClientCoordinationHeaders implements MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+	public static final String URL = "/jobs/:jobid/coordinators/:operatorid";
+
+	private static final ClientCoordinationHeaders INSTANCE = new ClientCoordinationHeaders();
+
+	private ClientCoordinationHeaders() {}
+
+	@Override
+	public Class<ClientCoordinationRequestBody> getRequestClass() {
+		return ClientCoordinationRequestBody.class;
+	}
+
+	@Override
+	public Class<ClientCoordinationResponseBody> getResponseClass() {
+		return ClientCoordinationResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public ClientCoordinationMessageParameters getUnresolvedMessageParameters() {
+		return new ClientCoordinationMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.POST;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static ClientCoordinationHeaders getInstance() {
+		return INSTANCE;
+	}
+
+	@Override
+	public String getDescription() {
+		return "Send a request to a specified coordinator of the specified job and get the response. " +
+			"This API is for internal use only.";
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
new file mode 100644
index 0000000..01be123
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link ClientCoordinationHandler}.
+ */
+public class ClientCoordinationMessageParameters extends MessageParameters {
+
+	public final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+	public final OperatorIDPathParameter operatorPathParameter = new OperatorIDPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(jobPathParameter, operatorPathParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptyList();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
new file mode 100644
index 0000000..e663320
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Request that carries a serialized {@link CoordinationRequest} to a specified coordinator.
+ */
+public class ClientCoordinationRequestBody implements RequestBody {
+
+	public static final String FIELD_NAME_SERIALIZED_COORDINATION_REQUEST = "serializedCoordinationRequest";
+
+	@JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+	@JsonSerialize(using = SerializedValueSerializer.class)
+	@JsonDeserialize(using = SerializedValueDeserializer.class)
+	private final SerializedValue<CoordinationRequest> serializedCoordinationRequest;
+
+	@JsonCreator
+	public ClientCoordinationRequestBody(
+		@JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST)
+			SerializedValue<CoordinationRequest> serializedCoordinationRequest) {
+		this.serializedCoordinationRequest = serializedCoordinationRequest;
+	}
+
+	@JsonIgnore
+	public SerializedValue<CoordinationRequest> getSerializedCoordinationRequest() {
+		return serializedCoordinationRequest;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
new file mode 100644
index 0000000..5d1cdf2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.coordination;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * Response that carries a serialized {@link CoordinationResponse} to the client.
+ */
+public class ClientCoordinationResponseBody implements ResponseBody {
+
+	public static final String FIELD_NAME_SERIALIZED_COORDINATION_RESULT = "serializedCoordinationResult";
+
+	@JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+	@JsonSerialize(using = SerializedValueSerializer.class)
+	@JsonDeserialize(using = SerializedValueDeserializer.class)
+	private final SerializedValue<CoordinationResponse> serializedCoordinationResponse;
+
+	@JsonCreator
+	public ClientCoordinationResponseBody(
+		@JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT)
+			SerializedValue<CoordinationResponse> serializedCoordinationResponse) {
+		this.serializedCoordinationResponse = serializedCoordinationResponse;
+	}
+
+	@JsonIgnore
+	public SerializedValue<CoordinationResponse> getSerializedCoordinationResponse() {
+		return serializedCoordinationResponse;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 2be0e5c..7cc17dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -76,6 +76,9 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
@@ -107,6 +110,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -164,6 +168,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	protected final ExecutionVertexVersioner executionVertexVersioner;
 
+	private final Map<OperatorID, OperatorCoordinator> coordinatorMap;
+
 	private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
 		"SchedulerBase is not initialized with proper main thread executor. " +
 			"Call to SchedulerBase.setMainThreadExecutor(...) required.");
@@ -222,6 +228,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 		this.schedulingTopology = executionGraph.getSchedulingTopology();
 
 		this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
+
+		this.coordinatorMap = createCoordinatorMap();
 	}
 
 	private ExecutionGraph createAndRestoreExecutionGraph(
@@ -932,6 +940,20 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			OperatorID operator,
+			CoordinationRequest request) throws FlinkException {
+		OperatorCoordinator coordinator = coordinatorMap.get(operator);
+		if (coordinator instanceof CoordinationRequestHandler) {
+			return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
+		} else if (coordinator != null) {
+			throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
+		} else {
+			throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+		}
+	}
+
 	private void startAllOperatorCoordinators() {
 		final Collection<OperatorCoordinator> coordinators = getAllCoordinators();
 		try {
@@ -951,8 +973,16 @@ public abstract class SchedulerBase implements SchedulerNG {
 	}
 
 	private Collection<OperatorCoordinator> getAllCoordinators() {
-		return getExecutionGraph().getAllVertices().values().stream()
-			.flatMap((vertex) -> vertex.getOperatorCoordinators().stream())
-			.collect(Collectors.toList());
+		return coordinatorMap.values();
+	}
+
+	private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() {
+		Map<OperatorID, OperatorCoordinator> coordinatorMap = new HashMap<>();
+		for (ExecutionJobVertex vertex : getExecutionGraph().getAllVertices().values()) {
+			for (Map.Entry<OperatorID, OperatorCoordinator> entry : vertex.getOperatorCoordinatorMap().entrySet()) {
+				coordinatorMap.put(entry.getKey(), entry.getValue());
+			}
+		}
+		return coordinatorMap;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 5a75d37..07f7e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -40,6 +40,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
@@ -138,4 +140,14 @@ public interface SchedulerNG {
 	 *                        for the given ID.
 	 */
 	void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException;
+
+	/**
+	 * Delivers a coordination request to the {@link OperatorCoordinator} with the given {@link OperatorID}
+	 * and returns the coordinator's response.
+	 *
+	 * @return A future containing the response.
+	 * @throws FlinkException Thrown, if the task is not running, or no operator/coordinator exists
+	 *                        for the given ID, or the coordinator cannot handle client events.
+	 */
+	CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index be0e45a..c736565 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -26,15 +26,19 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -186,4 +190,24 @@ public interface RestfulGateway extends RpcGateway {
 	default CompletableFuture<Acknowledge> shutDownCluster() {
 		throw new UnsupportedOperationException();
 	}
+
+	/**
+	 * Deliver a coordination request to a specified coordinator and return the response.
+	 *
+	 * @param jobId identifying the job which the coordinator belongs to
+	 * @param operatorId identifying the coordinator to receive the request
+	 * @param serializedRequest serialized request to deliver
+	 * @param timeout RPC timeout
+	 * @return A future containing the response.
+	 *         The response will fail with a {@link org.apache.flink.util.FlinkException}
+	 *         if the task is not running, or no operator/coordinator exists for the given ID,
+	 *         or the coordinator cannot handle client events.
+	 */
+	default CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			JobID jobId,
+			OperatorID operatorId,
+			SerializedValue<CoordinationRequest> serializedRequest,
+			@RpcTimeout Time timeout) {
+		throw new UnsupportedOperationException();
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index c17b2b7..fd67f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
@@ -119,6 +120,7 @@ import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -559,6 +561,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		final ClusterDataSetDeleteHandlers.ClusterDataSetDeleteStatusHandler clusterDataSetDeleteStatusHandler =
 			clusterDataSetDeleteHandlers.new ClusterDataSetDeleteStatusHandler(leaderRetriever, timeout, responseHeaders);
 
+		final ClientCoordinationHandler clientCoordinationHandler = new ClientCoordinationHandler(
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			ClientCoordinationHeaders.getInstance());
+
 		final ShutdownHandler shutdownHandler = new ShutdownHandler(
 			leaderRetriever,
 			timeout,
@@ -625,6 +633,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(clusterDataSetListHandler.getMessageHeaders(), clusterDataSetListHandler));
 		handlers.add(Tuple2.of(clusterDataSetDeleteTriggerHandler.getMessageHeaders(), clusterDataSetDeleteTriggerHandler));
 		handlers.add(Tuple2.of(clusterDataSetDeleteStatusHandler.getMessageHeaders(), clusterDataSetDeleteStatusHandler));
+		handlers.add(Tuple2.of(clientCoordinationHandler.getMessageHeaders(), clientCoordinationHandler));
 
 		// TODO: Remove once the Yarn proxy can forward all REST verbs
 		handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), yarnJobCancelTerminationHandler));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 47b9206..59b3cc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -157,6 +159,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	@Nonnull
 	private final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender;
 
+	@Nonnull
+	private final BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction;
+
 	public TestingJobMasterGateway(
 			@Nonnull String address,
 			@Nonnull String hostname,
@@ -185,7 +190,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 			@Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction,
 			@Nonnull Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction,
 			@Nonnull TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction,
-			@Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender) {
+			@Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender,
+			@Nonnull BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) {
 		this.address = address;
 		this.hostname = hostname;
 		this.cancelFunction = cancelFunction;
@@ -214,6 +220,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 		this.notifyKvStateUnregisteredFunction = notifyKvStateUnregisteredFunction;
 		this.updateAggregateFunction = updateAggregateFunction;
 		this.operatorEventSender = operatorEventSender;
+		this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction;
 	}
 
 	@Override
@@ -360,4 +367,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event) {
 		return operatorEventSender.apply(task, operatorID, event);
 	}
+
+	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, Time timeout) {
+		return deliverCoordinationRequestFunction.apply(operatorId, serializedRequest);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index a960448..20202cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -102,6 +104,7 @@ public class TestingJobMasterGatewayBuilder {
 	private Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
 	private TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction = (a, b, c) -> CompletableFuture.completedFuture(new Object());
 	private TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get());
+	private BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction = (a, b) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
 
 	public TestingJobMasterGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -243,6 +246,11 @@ public class TestingJobMasterGatewayBuilder {
 		return this;
 	}
 
+	public TestingJobMasterGatewayBuilder setDeliverCoordinationRequestFunction(BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) {
+		this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction;
+		return this;
+	}
+
 	public TestingJobMasterGateway build() {
 		return new TestingJobMasterGateway(
 			address,
@@ -272,6 +280,7 @@ public class TestingJobMasterGatewayBuilder {
 			notifyKvStateRegisteredFunction,
 			notifyKvStateUnregisteredFunction,
 			updateAggregateFunction,
-			operatorEventSender);
+			operatorEventSender,
+			deliverCoordinationRequestFunction);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 17a82f0..8ecf172 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -163,6 +165,52 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToRequestHandler() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingCoordinationRequestHandler.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationRequestHandler.Request<String> request =
+			new TestingCoordinationRequestHandler.Request<>(payload);
+		final TestingCoordinationRequestHandler.Response<String> response =
+			(TestingCoordinationRequestHandler.Response<String>)
+				scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get();
+
+		assertEquals(payload, response.getPayload());
+	}
+
+	@Test
+	public void testDeliveringClientRequestToNonRequestHandler() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationRequestHandler.Request<String> request =
+			new TestingCoordinationRequestHandler.Request<>(payload);
+
+		CommonTestUtils.assertThrows(
+			"cannot handle client event",
+			FlinkException.class,
+			() -> scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request));
+	}
+
+	@Test
+	public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationRequestHandler.Request<String> request =
+			new TestingCoordinationRequestHandler.Request<>(payload);
+
+		CommonTestUtils.assertThrows(
+			"does not exist",
+			FlinkException.class,
+			() -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request));
+	}
+
 	// ------------------------------------------------------------------------
 	//  test setups
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
new file mode 100644
index 0000000..4aa3c43
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A simple testing implementation of the {@link CoordinationRequestHandler}.
+ */
+public class TestingCoordinationRequestHandler extends TestingOperatorCoordinator implements CoordinationRequestHandler {
+
+	public TestingCoordinationRequestHandler(Context context) {
+		super(context);
+	}
+
+	@Override
+	public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
+		Request req = (Request) request;
+		return CompletableFuture.completedFuture(new Response<>(req.getPayload()));
+	}
+
+	/**
+	 * A testing stub for an {@link OperatorCoordinator.Provider} that creates a
+	 * {@link TestingCoordinationRequestHandler}.
+	 */
+	public static final class Provider implements OperatorCoordinator.Provider {
+
+		private static final long serialVersionUID = 1L;
+
+		private final OperatorID operatorId;
+
+		public Provider(OperatorID operatorId) {
+			this.operatorId = operatorId;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return operatorId;
+		}
+
+		@Override
+		public OperatorCoordinator create(Context context) {
+			return new TestingCoordinationRequestHandler(context);
+		}
+	}
+
+	/**
+	 * A {@link CoordinationRequest} that a {@link TestingCoordinationRequestHandler} receives.
+	 *
+	 * @param <T> payload type
+	 */
+	public static class Request<T> implements CoordinationRequest {
+
+		private static final long serialVersionUID = 1L;
+
+		private final T payload;
+
+		public Request(T payload) {
+			this.payload = payload;
+		}
+
+		public T getPayload() {
+			return payload;
+		}
+	}
+
+	/**
+	 * A {@link CoordinationResponse} that a {@link TestingCoordinationRequestHandler} gives.
+	 *
+	 * @param <T> payload type
+	 */
+	public static class Response<T> implements CoordinationResponse {
+
+		private static final long serialVersionUID = 1L;
+
+		private final T payload;
+
+		public Response(T payload) {
+			this.payload = payload;
+		}
+
+		public T getPayload() {
+			return payload;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 4afd56d..becf513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -29,12 +29,17 @@ import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -92,7 +97,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
 			DispatcherId fencingToken,
 			Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction,
 			Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier,
-			Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction) {
+			Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction,
+			TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
 		super(
 			address,
 			hostname,
@@ -107,7 +113,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
 			requestOperatorBackPressureStatsFunction,
 			triggerSavepointFunction,
 			stopWithSavepointFunction,
-			clusterShutdownSupplier);
+			clusterShutdownSupplier,
+			deliverCoordinationRequestToCoordinatorFunction);
 		this.submitFunction = submitFunction;
 		this.listFunction = listFunction;
 		this.blobServerPort = blobServerPort;
@@ -219,7 +226,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem
 				fencingToken,
 				requestArchivedJobFunction,
 				clusterShutdownSupplier,
-				clusterShutdownWithStatusFunction);
+				clusterShutdownWithStatusFunction,
+				deliverCoordinationRequestToCoordinatorFunction);
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b7cf02d..0acc5ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -26,12 +26,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -57,6 +62,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 	static final BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER = (jobId, jobVertexId) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_TRIGGER_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
+	static final TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION = (JobID jobId, OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	static final String LOCALHOST = "localhost";
 
 	protected String address;
@@ -89,6 +95,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 
 	protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction;
 
+	protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction;
+
 	public TestingRestfulGateway() {
 		this(
 			LOCALHOST,
@@ -104,7 +112,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 			DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER,
 			DEFAULT_TRIGGER_SAVEPOINT_FUNCTION,
 			DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION,
-			DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER);
+			DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER,
+			DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION);
 	}
 
 	public TestingRestfulGateway(
@@ -121,7 +130,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 			BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
 			BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction,
 			BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction,
-			Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier) {
+			Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier,
+			TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
 		this.address = address;
 		this.hostname = hostname;
 		this.cancelJobFunction = cancelJobFunction;
@@ -136,6 +146,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 		this.triggerSavepointFunction = triggerSavepointFunction;
 		this.stopWithSavepointFunction = stopWithSavepointFunction;
 		this.clusterShutdownSupplier = clusterShutdownSupplier;
+		this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction;
 	}
 
 	@Override
@@ -199,6 +210,15 @@ public class TestingRestfulGateway implements RestfulGateway {
 	}
 
 	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			JobID jobId,
+			OperatorID operatorId,
+			SerializedValue<CoordinationRequest> serializedRequest,
+			Time timeout) {
+		return deliverCoordinationRequestToCoordinatorFunction.apply(jobId, operatorId, serializedRequest);
+	}
+
+	@Override
 	public String getAddress() {
 		return address;
 	}
@@ -229,6 +249,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 		protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
 		protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
 		protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction;
+		protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction;
 
 		protected AbstractBuilder() {
 			cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
@@ -243,6 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 			triggerSavepointFunction = DEFAULT_TRIGGER_SAVEPOINT_FUNCTION;
 			stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION;
 			clusterShutdownSupplier = DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER;
+			deliverCoordinationRequestToCoordinatorFunction = DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION;
 		}
 
 		public T setAddress(String address) {
@@ -315,6 +337,11 @@ public class TestingRestfulGateway implements RestfulGateway {
 			return self();
 		}
 
+		public T setDeliverCoordinationRequestToCoordinatorFunction(TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) {
+			this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction;
+			return self();
+		}
+
 		protected abstract T self();
 
 		public abstract TestingRestfulGateway build();
@@ -346,7 +373,8 @@ public class TestingRestfulGateway implements RestfulGateway {
 				requestOperatorBackPressureStatsFunction,
 				triggerSavepointFunction,
 				stopWithSavepointFunction,
-				clusterShutdownSupplier);
+				clusterShutdownSupplier,
+				deliverCoordinationRequestToCoordinatorFunction);
 		}
 	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 00bc2b9..e548f5e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -31,10 +31,13 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.JobResult.Builder;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -274,6 +277,14 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
 				@Nullable String savepointDirectory) {
 			return null;
 		}
+
+		@Override
+		public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+				JobID jobId,
+				OperatorID operatorId,
+				CoordinationRequest request) {
+			return null;
+		}
 	}
 
 }