You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/02/15 15:29:57 UTC
[flink] branch master updated: [FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fdb8010 [FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)
fdb8010 is described below
commit fdb80108a3c0e4fb12dbc3f89ecb2327d97deebf
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Feb 15 23:29:03 2022 +0800
[FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)
---
.../state/api/runtime/SavepointEnvironment.java | 9 +++++
.../jobgraph/tasks/TaskOperatorEventGateway.java | 23 +++++++++---
.../apache/flink/runtime/jobmaster/JobMaster.java | 18 ++++++---
.../jobmaster/JobMasterOperatorEventGateway.java | 9 ++++-
.../coordination/CoordinationRequestHandler.java | 4 +-
.../rpc/RpcTaskOperatorEventGateway.java | 8 ++++
.../jobmaster/utils/TestingJobMasterGateway.java | 6 +++
.../CoordinatorEventsExactlyOnceITCase.java | 43 +++++++++++++++++++++-
.../TaskExecutorOperatorEventHandlingTest.java | 40 ++++++++++++++++++++
.../taskmanager/NoOpTaskOperatorEventGateway.java | 10 +++++
10 files changed, 153 insertions(+), 17 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 682c4cf..ae4ff9a 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -60,6 +62,7 @@ import org.apache.flink.util.UserCodeClassLoader;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE;
@@ -369,5 +372,11 @@ public class SavepointEnvironment implements Environment {
@Override
public void sendOperatorEventToCoordinator(
OperatorID operator, SerializedValue<OperatorEvent> event) {}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operator, SerializedValue<CoordinationRequest> request) {
+ return CompletableFuture.completedFuture(null);
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
index e44b9b8..9fc9c44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
@@ -21,17 +21,21 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.SerializedValue;
+import java.util.concurrent.CompletableFuture;
+
/**
- * Gateway to send an {@link OperatorEvent} from a Task to to the {@link OperatorCoordinator}
- * JobManager side.
+ * Gateway to send an {@link OperatorEvent} or {@link CoordinationRequest} from a Task to to the
+ * {@link OperatorCoordinator} JobManager side.
*
- * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
- * Each layer adds further context, so that the inner layers do not need to know about the complete
- * context, which keeps dependencies small and makes testing easier.
+ * <p>This is the first step in the chain of sending Operator Events and Requests from Operator to
+ * Coordinator. Each layer adds further context, so that the inner layers do not need to know about
+ * the complete context, which keeps dependencies small and makes testing easier.
*
* <pre>
* <li>{@code OperatorEventGateway} takes the event, enriches the event with the {@link OperatorID}, and
@@ -44,8 +48,15 @@ import org.apache.flink.util.SerializedValue;
public interface TaskOperatorEventGateway {
/**
- * Send an event from the operator (identified by the given operator ID) to the operator
+ * Sends an event from the operator (identified by the given operator ID) to the operator
* coordinator (identified by the same ID).
*/
void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);
+
+ /**
+ * Sends a request from current operator to a specified operator coordinator which is identified
+ * by the given operator ID and return the response.
+ */
+ CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operator, SerializedValue<CoordinationRequest> request);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 90ab4db..af89974 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -565,6 +565,17 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
}
@Override
+ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operatorID, SerializedValue<CoordinationRequest> serializedRequest) {
+ try {
+ final CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
+ return schedulerNG.deliverCoordinationRequestToCoordinator(operatorID, request);
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
+ @Override
public CompletableFuture<KvStateLocation> requestKvStateLocation(
final JobID jobId, final String registrationName) {
try {
@@ -876,12 +887,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
OperatorID operatorId,
SerializedValue<CoordinationRequest> serializedRequest,
Time timeout) {
- try {
- CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
- return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
- }
+ return this.sendRequestToCoordinator(operatorId, serializedRequest);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
index 714a4bd..80d728d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.SerializedValue;
@@ -29,8 +31,8 @@ import org.apache.flink.util.SerializedValue;
import java.util.concurrent.CompletableFuture;
/**
- * Gateway to send an {@link OperatorEvent} from the Task Manager to to the {@link
- * OperatorCoordinator} on the JobManager side.
+ * Gateway to send an {@link OperatorEvent} or {@link CoordinationRequest} from the Task Manager to
+ * to the {@link OperatorCoordinator} on the JobManager side.
*
* <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
* Each layer adds further context, so that the inner layers do not need to know about the complete
@@ -48,4 +50,7 @@ public interface JobMasterOperatorEventGateway {
CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event);
+
+ CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operatorID, SerializedValue<CoordinationRequest> request);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
index 495cfff..4c7ddc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
@@ -22,12 +22,12 @@ import java.util.concurrent.CompletableFuture;
/**
* Coordinator interface which can handle {@link CoordinationRequest}s and response with {@link
- * CoordinationResponse}s to the client.
+ * CoordinationResponse}s to the client or operator.
*/
public interface CoordinationRequestHandler {
/**
- * Called when receiving a request from the client.
+ * Called when receiving a request from the client or operator.
*
* @param request the request received
* @return a future containing the response from the coordinator for this request
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
index 1a4a968..709e0f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.SerializedValue;
@@ -64,4 +66,10 @@ public class RpcTaskOperatorEventGateway implements TaskOperatorEventGateway {
}
});
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operator, SerializedValue<CoordinationRequest> request) {
+ return rpcGateway.sendRequestToCoordinator(operator, request);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 09c4f9e..bee9b19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -532,6 +532,12 @@ public class TestingJobMasterGateway implements JobMasterGateway {
}
@Override
+ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operatorID, SerializedValue<CoordinationRequest> request) {
+ return deliverCoordinationRequestFunction.apply(operatorID, request);
+ }
+
+ @Override
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operatorId,
SerializedValue<CoordinationRequest> serializedRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 54a337d..9bf4106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -84,7 +84,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -280,6 +282,22 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
}
}
+ private static final class IntegerRequest implements CoordinationRequest {
+ final int value;
+
+ private IntegerRequest(int value) {
+ this.value = value;
+ }
+ }
+
+ private static final class IntegerResponse implements CoordinationResponse {
+ final int value;
+
+ private IntegerResponse(int value) {
+ this.value = value;
+ }
+ }
+
// ------------------------------------------------------------------------
/**
@@ -292,7 +310,8 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
* concurrency against the scheduler thread that calls this coordinator implements a simple
* mailbox that moves the method handling into a separate thread, but keeps the order.
*/
- private static final class EventSendingCoordinator implements OperatorCoordinator {
+ private static final class EventSendingCoordinator
+ implements OperatorCoordinator, CoordinationRequestHandler {
private final Context context;
@@ -532,6 +551,17 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
context.failJob(new Exception("test failure"));
}
}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+ CoordinationRequest request) {
+ if (request instanceof IntegerRequest) {
+ int value = ((IntegerRequest) request).value;
+ return CompletableFuture.completedFuture(new IntegerResponse(value + 1));
+ } else {
+ throw new UnsupportedOperationException("Unsupported request type: " + request);
+ }
+ }
}
// ------------------------------------------------------------------------
@@ -566,6 +596,17 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
.sendOperatorEventToCoordinator(
operatorID, new SerializedValue<>(new StartEvent()));
+ // verify the request & response communication
+ CoordinationResponse response =
+ getEnvironment()
+ .getOperatorCoordinatorEventGateway()
+ .sendRequestToCoordinator(
+ operatorID, new SerializedValue<>(new IntegerRequest(100)))
+ .get();
+
+ assertThat(response, instanceOf(IntegerResponse.class));
+ assertEquals(101, ((IntegerResponse) response).value);
+
// poor-man's mailbox
Object next;
while (running && !((next = actions.take()) instanceof EndEvent)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
index 16deaaa..9014ec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.CancelableInvokable;
@@ -117,6 +118,21 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
}
}
+ @Test
+ public void requestToCoordinatorDeliveryFailureFailsTask() throws Exception {
+ final JobID jobId = new JobID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ try (TaskSubmissionTestEnvironment env =
+ createExecutorWithRunningTask(
+ jobId, eid, CoordinationRequestSendingInvokable.class)) {
+ final Task task = env.getTaskSlotTable().getTask(eid);
+
+ task.getExecutingThread().join(10_000);
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ }
+ }
+
// ------------------------------------------------------------------------
// test setup helpers
// ------------------------------------------------------------------------
@@ -148,6 +164,10 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
(eio, oid, value) -> {
throw new RuntimeException();
})
+ .setDeliverCoordinationRequestFunction(
+ (oid, value) -> {
+ throw new RuntimeException();
+ })
.build())
.build();
@@ -225,4 +245,24 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
waitUntilCancelled();
}
}
+
+ /** Test invokable that fails when receiving a coordination request. */
+ public static final class CoordinationRequestSendingInvokable extends CancelableInvokable {
+
+ public CoordinationRequestSendingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ protected void doInvoke() throws Exception {
+ getEnvironment()
+ .getOperatorCoordinatorEventGateway()
+ .sendRequestToCoordinator(
+ new OperatorID(),
+ new SerializedValue<>(
+ new TestingCoordinationRequestHandler.Request<>(0L)));
+
+ waitUntilCancelled();
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
index 04316c6..2f34754 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
@@ -21,13 +21,23 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.SerializedValue;
+import java.util.concurrent.CompletableFuture;
+
/** A test / mock implementation of the {@link JobMasterOperatorEventGateway}. */
public class NoOpTaskOperatorEventGateway implements TaskOperatorEventGateway {
@Override
public void sendOperatorEventToCoordinator(
OperatorID operatorID, SerializedValue<OperatorEvent> event) {}
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+ OperatorID operator, SerializedValue<CoordinationRequest> request) {
+ return CompletableFuture.completedFuture(null);
+ }
}