You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/02/15 15:29:57 UTC

[flink] branch master updated: [FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fdb8010  [FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)
fdb8010 is described below

commit fdb80108a3c0e4fb12dbc3f89ecb2327d97deebf
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Feb 15 23:29:03 2022 +0800

    [FLINK-26077][runtime] Support operators send request to Coordinator and return a response (#18737)
---
 .../state/api/runtime/SavepointEnvironment.java    |  9 +++++
 .../jobgraph/tasks/TaskOperatorEventGateway.java   | 23 +++++++++---
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 18 ++++++---
 .../jobmaster/JobMasterOperatorEventGateway.java   |  9 ++++-
 .../coordination/CoordinationRequestHandler.java   |  4 +-
 .../rpc/RpcTaskOperatorEventGateway.java           |  8 ++++
 .../jobmaster/utils/TestingJobMasterGateway.java   |  6 +++
 .../CoordinatorEventsExactlyOnceITCase.java        | 43 +++++++++++++++++++++-
 .../TaskExecutorOperatorEventHandlingTest.java     | 40 ++++++++++++++++++++
 .../taskmanager/NoOpTaskOperatorEventGateway.java  | 10 +++++
 10 files changed, 153 insertions(+), 17 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 682c4cf..ae4ff9a 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -60,6 +62,7 @@ import org.apache.flink.util.UserCodeClassLoader;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE;
@@ -369,5 +372,11 @@ public class SavepointEnvironment implements Environment {
         @Override
         public void sendOperatorEventToCoordinator(
                 OperatorID operator, SerializedValue<OperatorEvent> event) {}
+
+        @Override
+        public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+                OperatorID operator, SerializedValue<CoordinationRequest> request) {
+            return CompletableFuture.completedFuture(null);
+        }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
index e44b9b8..9fc9c44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
@@ -21,17 +21,21 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.SerializedValue;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * Gateway to send an {@link OperatorEvent} from a Task to to the {@link OperatorCoordinator}
- * JobManager side.
+ * Gateway to send an {@link OperatorEvent} or {@link CoordinationRequest} from a Task to to the
+ * {@link OperatorCoordinator} JobManager side.
  *
- * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
- * Each layer adds further context, so that the inner layers do not need to know about the complete
- * context, which keeps dependencies small and makes testing easier.
+ * <p>This is the first step in the chain of sending Operator Events and Requests from Operator to
+ * Coordinator. Each layer adds further context, so that the inner layers do not need to know about
+ * the complete context, which keeps dependencies small and makes testing easier.
  *
  * <pre>
  *     <li>{@code OperatorEventGateway} takes the event, enriches the event with the {@link OperatorID}, and
@@ -44,8 +48,15 @@ import org.apache.flink.util.SerializedValue;
 public interface TaskOperatorEventGateway {
 
     /**
-     * Send an event from the operator (identified by the given operator ID) to the operator
+     * Sends an event from the operator (identified by the given operator ID) to the operator
      * coordinator (identified by the same ID).
      */
     void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);
+
+    /**
+     * Sends a request from current operator to a specified operator coordinator which is identified
+     * by the given operator ID and return the response.
+     */
+    CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operator, SerializedValue<CoordinationRequest> request);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 90ab4db..af89974 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -565,6 +565,17 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
     }
 
     @Override
+    public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operatorID, SerializedValue<CoordinationRequest> serializedRequest) {
+        try {
+            final CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
+            return schedulerNG.deliverCoordinationRequestToCoordinator(operatorID, request);
+        } catch (Exception e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<KvStateLocation> requestKvStateLocation(
             final JobID jobId, final String registrationName) {
         try {
@@ -876,12 +887,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
             OperatorID operatorId,
             SerializedValue<CoordinationRequest> serializedRequest,
             Time timeout) {
-        try {
-            CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
-            return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
-        } catch (Exception e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return this.sendRequestToCoordinator(operatorId, serializedRequest);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
index 714a4bd..80d728d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.SerializedValue;
@@ -29,8 +31,8 @@ import org.apache.flink.util.SerializedValue;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Gateway to send an {@link OperatorEvent} from the Task Manager to to the {@link
- * OperatorCoordinator} on the JobManager side.
+ * Gateway to send an {@link OperatorEvent} or {@link CoordinationRequest} from the Task Manager to
+ * to the {@link OperatorCoordinator} on the JobManager side.
  *
  * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
  * Each layer adds further context, so that the inner layers do not need to know about the complete
@@ -48,4 +50,7 @@ public interface JobMasterOperatorEventGateway {
 
     CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
             ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event);
+
+    CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operatorID, SerializedValue<CoordinationRequest> request);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
index 495cfff..4c7ddc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java
@@ -22,12 +22,12 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Coordinator interface which can handle {@link CoordinationRequest}s and response with {@link
- * CoordinationResponse}s to the client.
+ * CoordinationResponse}s to the client or operator.
  */
 public interface CoordinationRequestHandler {
 
     /**
-     * Called when receiving a request from the client.
+     * Called when receiving a request from the client or operator.
      *
      * @param request the request received
      * @return a future containing the response from the coordinator for this request
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
index 1a4a968..709e0f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.SerializedValue;
 
@@ -64,4 +66,10 @@ public class RpcTaskOperatorEventGateway implements TaskOperatorEventGateway {
                     }
                 });
     }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operator, SerializedValue<CoordinationRequest> request) {
+        return rpcGateway.sendRequestToCoordinator(operator, request);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 09c4f9e..bee9b19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -532,6 +532,12 @@ public class TestingJobMasterGateway implements JobMasterGateway {
     }
 
     @Override
+    public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operatorID, SerializedValue<CoordinationRequest> request) {
+        return deliverCoordinationRequestFunction.apply(operatorID, request);
+    }
+
+    @Override
     public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
             OperatorID operatorId,
             SerializedValue<CoordinationRequest> serializedRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 54a337d..9bf4106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -84,7 +84,9 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -280,6 +282,22 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
         }
     }
 
+    private static final class IntegerRequest implements CoordinationRequest {
+        final int value;
+
+        private IntegerRequest(int value) {
+            this.value = value;
+        }
+    }
+
+    private static final class IntegerResponse implements CoordinationResponse {
+        final int value;
+
+        private IntegerResponse(int value) {
+            this.value = value;
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     /**
@@ -292,7 +310,8 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
      * concurrency against the scheduler thread that calls this coordinator implements a simple
      * mailbox that moves the method handling into a separate thread, but keeps the order.
      */
-    private static final class EventSendingCoordinator implements OperatorCoordinator {
+    private static final class EventSendingCoordinator
+            implements OperatorCoordinator, CoordinationRequestHandler {
 
         private final Context context;
 
@@ -532,6 +551,17 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
                 context.failJob(new Exception("test failure"));
             }
         }
+
+        @Override
+        public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+                CoordinationRequest request) {
+            if (request instanceof IntegerRequest) {
+                int value = ((IntegerRequest) request).value;
+                return CompletableFuture.completedFuture(new IntegerResponse(value + 1));
+            } else {
+                throw new UnsupportedOperationException("Unsupported request type: " + request);
+            }
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -566,6 +596,17 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
                     .sendOperatorEventToCoordinator(
                             operatorID, new SerializedValue<>(new StartEvent()));
 
+            // verify the request & response communication
+            CoordinationResponse response =
+                    getEnvironment()
+                            .getOperatorCoordinatorEventGateway()
+                            .sendRequestToCoordinator(
+                                    operatorID, new SerializedValue<>(new IntegerRequest(100)))
+                            .get();
+
+            assertThat(response, instanceOf(IntegerResponse.class));
+            assertEquals(101, ((IntegerResponse) response).value);
+
             // poor-man's mailbox
             Object next;
             while (running && !((next = actions.take()) instanceof EndEvent)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
index 16deaaa..9014ec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testutils.CancelableInvokable;
@@ -117,6 +118,21 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
         }
     }
 
+    @Test
+    public void requestToCoordinatorDeliveryFailureFailsTask() throws Exception {
+        final JobID jobId = new JobID();
+        final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+        try (TaskSubmissionTestEnvironment env =
+                createExecutorWithRunningTask(
+                        jobId, eid, CoordinationRequestSendingInvokable.class)) {
+            final Task task = env.getTaskSlotTable().getTask(eid);
+
+            task.getExecutingThread().join(10_000);
+            assertEquals(ExecutionState.FAILED, task.getExecutionState());
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  test setup helpers
     // ------------------------------------------------------------------------
@@ -148,6 +164,10 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
                                                 (eio, oid, value) -> {
                                                     throw new RuntimeException();
                                                 })
+                                        .setDeliverCoordinationRequestFunction(
+                                                (oid, value) -> {
+                                                    throw new RuntimeException();
+                                                })
                                         .build())
                         .build();
 
@@ -225,4 +245,24 @@ public class TaskExecutorOperatorEventHandlingTest extends TestLogger {
             waitUntilCancelled();
         }
     }
+
+    /** Test invokable that fails when receiving a coordination request. */
+    public static final class CoordinationRequestSendingInvokable extends CancelableInvokable {
+
+        public CoordinationRequestSendingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        protected void doInvoke() throws Exception {
+            getEnvironment()
+                    .getOperatorCoordinatorEventGateway()
+                    .sendRequestToCoordinator(
+                            new OperatorID(),
+                            new SerializedValue<>(
+                                    new TestingCoordinationRequestHandler.Request<>(0L)));
+
+            waitUntilCancelled();
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
index 04316c6..2f34754 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
@@ -21,13 +21,23 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.SerializedValue;
 
+import java.util.concurrent.CompletableFuture;
+
 /** A test / mock implementation of the {@link JobMasterOperatorEventGateway}. */
 public class NoOpTaskOperatorEventGateway implements TaskOperatorEventGateway {
 
     @Override
     public void sendOperatorEventToCoordinator(
             OperatorID operatorID, SerializedValue<OperatorEvent> event) {}
+
+    @Override
+    public CompletableFuture<CoordinationResponse> sendRequestToCoordinator(
+            OperatorID operator, SerializedValue<CoordinationRequest> request) {
+        return CompletableFuture.completedFuture(null);
+    }
 }