You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/08 11:12:36 UTC

[GitHub] [flink] TsReaper opened a new pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

TsReaper opened a new pull request #12037:
URL: https://github.com/apache/flink/pull/12037


   ## What is the purpose of the change
   
   This PR is part of the [FLINK-14807](https://issues.apache.org/jira/browse/FLINK-14807) which is going to introduce a collecting method for tables. See [here](https://docs.google.com/document/d/13Ata18-e89_hAdfukzEJYreOg2FBZO_Y0RohLDAme6Y/edit) for the whole design document.
   
   To allow the clients to receive results from the coordinator, which acts as a proxy between the clients and the sinks, a communication protocol should be introduced between the clients and the coordinators. This PR introduce this new communication protocol through the REST API.
   
   Note that as coordinators are internally used only, we're not going to expose this REST API to the user, thus no user document is provided.
   
   ## Brief change log
   
    - Introduce a REST API to allow the communication between clients and coordinators.
   
   ## Verifying this change
   
   This PR is only an internal API change without real usage. This change can be tested after the whole collecting mechanism is introduced.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes (JobManager)
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? not documented


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r422751031



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -163,6 +163,36 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToResponser() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingCoordinationResponser.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationResponser.Request<String> request =
+			new TestingCoordinationResponser.Request<>(payload);
+		final TestingCoordinationResponser.Response<String> response =
+			(TestingCoordinationResponser.Response<String>)
+				scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get();
+
+		assertEquals(payload, response.getPayload());
+	}
+
+	@Test
+	public void testDeliveringClientRequestToNonResponser() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationResponser.Request<String> request =
+			new TestingCoordinationResponser.Request<>(payload);
+		final CompletableFuture<CoordinationResponse> future =
+			scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request);
+
+		assertThat(future, futureFailedWith(IllegalArgumentException.class));
+	}
+

Review comment:
       add a test for scenario: "operator id not found"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -933,6 +941,22 @@ public void deliverOperatorEventToCoordinator(
 		}
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+			OperatorID operator,
+			CoordinationRequest request) throws FlinkException {
+		OperatorCoordinator coordinator = coordinatorMap.get(operator);
+		if (coordinator instanceof CoordinationResponser) {
+			return CompletableFuture.completedFuture(
+				((CoordinationResponser) coordinator).handleCoordinationRequest(request));
+		} else if (coordinator != null) {
+			return FutureUtils.completedExceptionally(
+				new IllegalArgumentException("Coordinator of operator " + operator + " cannot handle client event"));

Review comment:
       I think we should throw `FlinkException`, this situation is the same as the operation does not exist

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -411,6 +418,36 @@ public void close() {
 		return triggerSavepoint(jobId, savepointDirectory, false);
 	}
 
+	@Override
+	public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+			JobID jobId,
+			OperatorID operatorId,
+			CoordinationRequest request) {
+		ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
+		ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
+		params.jobPathParameter.resolve(jobId);
+		params.operatorPathParameter.resolve(operatorId);
+
+		SerializedValue<CoordinationRequest> serializedRequest;
+		try {
+			serializedRequest = new SerializedValue<>(request);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+
+		ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
+		return sendRequest(headers, params, requestBody).thenApply(
+			responseBody -> {
+				try {
+					return responseBody
+						.getSerializedCoordinationResponse()
+						.deserializeValue(getClass().getClassLoader());
+				} catch (IOException | ClassNotFoundException e) {
+					throw new RuntimeException("Failed to deserialize coordination response", e);

Review comment:
       throw `CompletionException` like `getAccumulators` method

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponser.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+/**
+ * Coordinator interface which can handle {@link CoordinationRequest}s
+ * and response with {@link CoordinationResponse}s to the client.
+ */
+public interface CoordinationResponser {

Review comment:
       rename to `CoordinationHandler` ? similar to `OperatorEventHandler`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequester.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface which sends out a {@link CoordinationRequest} and
+ * expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}.
+ */
+public interface CoordinationRequester {

Review comment:
       rename to `CoordinationRequestGateway`? similar to `OperatorEventGateway`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625766815


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 (Fri May 08 11:14:48 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4dcc0c0f63898b3b1ab8f230257165c1b9497130 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147) 
   * ba87f55e1d19fbf84eec52d084736694497e9307 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628529682


   True. Thanks for the information.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628733250


   Thanks @TsReaper. Merging this PR once AZP gives green light.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4dcc0c0f63898b3b1ab8f230257165c1b9497130 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147) 
   * ba87f55e1d19fbf84eec52d084736694497e9307 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] KurtYoung commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
KurtYoung commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628943268


   AZP passed, I'm merging this then I can start to review another PR based on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r422757580



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponser.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+/**
+ * Coordinator interface which can handle {@link CoordinationRequest}s
+ * and response with {@link CoordinationResponse}s to the client.
+ */
+public interface CoordinationResponser {

Review comment:
       `CoordinationRequestHandler` seems to be better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r424830919



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToResponser() throws Exception {

Review comment:
       This should be `testDeliveringClientRequestToHandler` now as the name of the interface is changed to `CoordinationRequestHandler`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628524482


   > @flinkbot run azure
   
   It has already finished, see [here](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1239&view=results), all tests have passed except e2e tests which are cancelled.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c06e5db9e64a2b39d857babff53d401697b87262 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8317107cba32a7102c19e920b2a9932e40bbc5a7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628524482


   > @flinkbot run azure
   
   It has finished, see [here](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1239&view=results).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-626123332


   cc @becketqin @StephanEwen @godfreyhe please help for the review, thanks a lot~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] KurtYoung merged pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
KurtYoung merged pull request #12037:
URL: https://github.com/apache/flink/pull/12037


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1320",
       "triggerID" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d515b32100a8912e2c35e8de1310fc60ad2aa2c9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1320) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628521681


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d9891558892286ea495d638fdd602b49ceadd43 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850) 
   * c06e5db9e64a2b39d857babff53d401697b87262 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ba87f55e1d19fbf84eec52d084736694497e9307 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r424914569



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.coordination;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Handler that receives the coordination requests from the client and returns the response from the coordinator.
+ */
+public class ClientCoordinationHandler extends AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+	public ClientCoordinationHandler(
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> messageHeaders) {
+		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(
+			@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request,
+			@Nonnull RestfulGateway gateway) throws RestHandlerException {
+		JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		OperatorID operatorId = request.getPathParameter(OperatorIDPathParameter.class);
+		SerializedValue<CoordinationRequest> serializedRequest =
+			request.getRequestBody().getSerializedCoordinationRequest();
+		CompletableFuture<CoordinationResponse> responseFuture =
+			gateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, timeout);
+		return responseFuture.thenApply(
+			coordinationResponse -> {
+				try {
+					return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));

Review comment:
       Makes sense. I guess we will use another layer of `SerializedValue` if the user requests a user code object from the `OperatorCoordinator` which is then initialized on the JM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628663314


   @TsReaper I have merged #12141. You could now rebase this PR on the current master and add the `@ExcludeFromDocumentation` annotation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c06e5db9e64a2b39d857babff53d401697b87262 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858) 
   * 8317107cba32a7102c19e920b2a9932e40bbc5a7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628342272


   > Are we intentionally not regenerating the REST documentation?
   
   Yes. As coordinators are used only internally, we're not going to expose this to the users so no user documentation is updated. Shall I update `RestAPIDocGenerator`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c06e5db9e64a2b39d857babff53d401697b87262 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858) 
   * 8317107cba32a7102c19e920b2a9932e40bbc5a7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628342272


   > Are we intentionally not regenerating the REST documentation?
   
   Yes. As coordinators are used only internally, we're not going to expose this to the users so no user documentation is updated.
   
   I've updated `RestAPIDocGenerator` in FLINK-17680 and #12141 , please take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628524482


   > @flinkbot run azure
   
   It has already finished, see [here](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1239&view=results).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r424831004



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToResponser() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingCoordinationRequestHandler.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationRequestHandler.Request<String> request =
+			new TestingCoordinationRequestHandler.Request<>(payload);
+		final TestingCoordinationRequestHandler.Response<String> response =
+			(TestingCoordinationRequestHandler.Response<String>)
+				scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get();
+
+		assertEquals(payload, response.getPayload());
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testDeliveringClientRequestToNonResponser() throws Exception {

Review comment:
       This should be `testDeliveringClientRequestToNonHandler` now as the name of the interface is changed to `CoordinationRequestHandler`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r424830919



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToResponser() throws Exception {

Review comment:
       This should be `testDeliveringClientRequestToRequestHandler` now as the name of the interface is changed to `CoordinationRequestHandler`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
##########
@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDeliveringClientRequestToResponser() throws Exception {
+		final OperatorCoordinator.Provider provider = new TestingCoordinationRequestHandler.Provider(testOperatorId);
+		final DefaultScheduler scheduler = createScheduler(provider);
+
+		final String payload = "testing payload";
+		final TestingCoordinationRequestHandler.Request<String> request =
+			new TestingCoordinationRequestHandler.Request<>(payload);
+		final TestingCoordinationRequestHandler.Response<String> response =
+			(TestingCoordinationRequestHandler.Response<String>)
+				scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get();
+
+		assertEquals(payload, response.getPayload());
+	}
+
+	@Test(expected = FlinkException.class)
+	public void testDeliveringClientRequestToNonResponser() throws Exception {

Review comment:
       This should be `testDeliveringClientRequestToNonRequestHandler` now as the name of the interface is changed to `CoordinationRequestHandler`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12037:
URL: https://github.com/apache/flink/pull/12037#discussion_r424832705



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.coordination;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
+import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Handler that receives the coordination requests from the client and returns the response from the coordinator.
+ */
+public class ClientCoordinationHandler extends AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> {
+
+	public ClientCoordinationHandler(
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> messageHeaders) {
+		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(
+			@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request,
+			@Nonnull RestfulGateway gateway) throws RestHandlerException {
+		JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		OperatorID operatorId = request.getPathParameter(OperatorIDPathParameter.class);
+		SerializedValue<CoordinationRequest> serializedRequest =
+			request.getRequestBody().getSerializedCoordinationRequest();
+		CompletableFuture<CoordinationResponse> responseFuture =
+			gateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, timeout);
+		return responseFuture.thenApply(
+			coordinationResponse -> {
+				try {
+					return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));

Review comment:
       Because `CoordinationResponse` is an object and should be serialized in order to be passed back to the client through the REST API. Is there a more proper way to achieve this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628342272


   > Are we intentionally not regenerating the REST documentation?
   
   Yes. As coordinators are used only internally, we're not going to expose this to the users so no user documentation is updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d9891558892286ea495d638fdd602b49ceadd43 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850) 
   * c06e5db9e64a2b39d857babff53d401697b87262 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ba87f55e1d19fbf84eec52d084736694497e9307 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239) 
   * d515b32100a8912e2c35e8de1310fc60ad2aa2c9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=858",
       "triggerID" : "c06e5db9e64a2b39d857babff53d401697b87262",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=932",
       "triggerID" : "8317107cba32a7102c19e920b2a9932e40bbc5a7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1064",
       "triggerID" : "77942bf5fcfee886d9dfd16633c41be3ca8d0f7d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1147",
       "triggerID" : "4dcc0c0f63898b3b1ab8f230257165c1b9497130",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239",
       "triggerID" : "ba87f55e1d19fbf84eec52d084736694497e9307",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1320",
       "triggerID" : "d515b32100a8912e2c35e8de1310fc60ad2aa2c9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ba87f55e1d19fbf84eec52d084736694497e9307 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1239) 
   * d515b32100a8912e2c35e8de1310fc60ad2aa2c9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1320) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830) 
   * 6d9891558892286ea495d638fdd602b49ceadd43 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d9891558892286ea495d638fdd602b49ceadd43 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-625870801


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830",
       "triggerID" : "ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850",
       "triggerID" : "6d9891558892286ea495d638fdd602b49ceadd43",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6e6c99ed5727eb9cf8891dbf9cd1afcd6e31a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=830) 
   * 6d9891558892286ea495d638fdd602b49ceadd43 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=850) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628434531


   I like the idea to have an internal REST API which we don't expose to the user because it is not intended for public use.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12037: [FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12037:
URL: https://github.com/apache/flink/pull/12037#issuecomment-628675695


   @tillrohrmann @zentol I've rebased onto master and marked `ClientCoordinationHeaders` with `@ExcludeFromDocumentation`, please take another look. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org