You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/02/11 00:33:11 UTC

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/5451

    [FLINK-8632] [flip6] Introduce generalized asynchronous operation handlers

    ## What is the purpose of the change
    
    The asynchronous operation handlers are the generalization of the SavepointHandlers.
    They consist of a Trigger- and a StatusHandler. The TriggerHandler is used to trigger
    an asynchronous operation. The handler stores the operation future and returns a
    trigger id. The trigger id can be used to query the status of the operation via the
    StatusHandler. Once the operation has completed, the StatusHandler will return the
    result.
    
    This PR is based on #5450.
    
    ## Brief change log
    
    - Introduced `AbstractAsynchronousOperationHandlers` derived from `SavepointHandlers`
    - Reworked `SavepointHandlers` according to `AbstractAsynchronousOperationHandlers` interface
    
    ## Verifying this change
    
    - Added `AbstractAsynchronousOperationHandlersTest`
    - Also tested by `SavepointHandlersTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink asynchronousOperationHandlers

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5451
    
----
commit b0efaa09c3b39e31ba130c668abe39af40473fb4
Author: Till Rohrmann <tr...@...>
Date:   2018-02-03T16:41:51Z

    [FLINK-8631] [rest] Add support for generic types to the RestClient
    
    This commit allows the Restclient to receive generic response types. In order
    to do this, the MessageHeaders contain now information about the generic
    type parameters of the response type.

commit e26cf11281b48bd04372f12363397c9267e4413a
Author: Till Rohrmann <tr...@...>
Date:   2018-02-02T10:23:00Z

    [FLINK-8632] [flip6] Introduce generalized asynchronous operation handlers
    
    The asynchronous operation handlers are the generalization of the SavepointHandlers.
    They consist of a Trigger- and a StatusHandler. The TriggerHandler is used to trigger
    an asynchronous operation. The handler stores the operation future and returns a
    trigger id. The trigger id can be used to query the status of the operation via the
    StatusHandler. Once the operation has completed, the StatusHandler will return the
    result.

----


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5451


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169682725
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingGatewayRetriever.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.retriever;
    +
    +import org.apache.flink.runtime.rpc.RpcGateway;
    +
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Testing {@link GatewayRetriever} implementation which can be
    + * manually set.
    + *
    + * @param <T> type of the gateway
    + */
    +public class TestingGatewayRetriever<T extends RpcGateway> implements GatewayRetriever<T> {
    +	private CompletableFuture<T> gatewayFuture;
    +
    +	public TestingGatewayRetriever() {
    +		gatewayFuture = new CompletableFuture<>();
    +	}
    +
    +	@Override
    +	public CompletableFuture<T> getFuture() {
    +		return null;
    --- End diff --
    
    I think there is a better value than `null` to return.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169696196
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerId;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.TestingGatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link AbstractAsynchronousOperationHandlers}.
    + */
    +public class AbstractAsynchronousOperationHandlersTest extends TestLogger {
    +
    +	private static final CompletableFuture<String> localRestAddress = CompletableFuture.completedFuture("localhost");
    +
    +	private static final Time timeout = Time.seconds(10L);
    +
    +	private TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers;
    +
    +	private TestingAsynchronousOperationHandlers.TestingTriggerHandler testingTriggerHandler;
    +
    +	private TestingAsynchronousOperationHandlers.TestingStatusHandler testingStatusHandler;
    +
    +	private TestingGatewayRetriever<RestfulGateway> testingGatewayRetriever;
    +
    +	@Before
    +	public void setup() {
    +		testingGatewayRetriever = new TestingGatewayRetriever<>();
    +		testingAsynchronousOperationHandlers = new TestingAsynchronousOperationHandlers();
    +
    +		testingTriggerHandler = testingAsynchronousOperationHandlers.new TestingTriggerHandler(
    +			localRestAddress,
    +			testingGatewayRetriever,
    +			timeout,
    +			Collections.emptyMap(),
    +			TestingTriggerMessageHeaders.INSTANCE);
    +
    +		testingStatusHandler = testingAsynchronousOperationHandlers.new TestingStatusHandler(
    +			localRestAddress,
    +			testingGatewayRetriever,
    +			timeout,
    +			Collections.emptyMap(),
    +			TestingStatusMessageHeaders.INSTANCE);
    +	}
    +
    +	/**
    +	 * Tests the triggering and successful completion of an asynchronous operation.
    +	 */
    +	@Test
    +	public void testOperationCompletion() throws Exception {
    +		final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
    +		final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
    +			.setTriggerSavepointFunction((JobID jobId, String directory) -> savepointFuture)
    +			.build();
    +
    +		testingGatewayRetriever.setGateway(testingRestfulGateway);
    --- End diff --
    
    Sometimes you set the gateway in the test retriever, sometimes you don't (e.g., `testUnknownTriggerId`). We don't rely on `testingGatewayRetriever` in the tests because `handleRequest` expects the `RestfulGateway` as an argument.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169676080
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java ---
    @@ -123,214 +109,112 @@ public SavepointHandlers(@Nullable final String defaultSavepointDir) {
     	/**
     	 * HTTP handler to trigger savepoints.
     	 */
    -	public class SavepointTriggerHandler
    -			extends AbstractRestHandler<RestfulGateway, SavepointTriggerRequestBody, SavepointTriggerResponseBody, SavepointTriggerMessageParameters> {
    +	public class SavepointTriggerHandler extends TriggerHandler<RestfulGateway, SavepointTriggerRequestBody, SavepointTriggerMessageParameters> {
     
     		public SavepointTriggerHandler(
    -				final CompletableFuture<String> localRestAddress,
    -				final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    -				final Time timeout,
    -				final Map<String, String> responseHeaders) {
    +			CompletableFuture<String> localRestAddress,
    +			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    +			Time timeout,
    +			Map<String, String> responseHeaders) {
     			super(localRestAddress, leaderRetriever, timeout, responseHeaders, SavepointTriggerHeaders.getInstance());
     		}
     
     		@Override
    -		protected CompletableFuture<SavepointTriggerResponseBody> handleRequest(
    -				@Nonnull final HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request,
    -				@Nonnull final RestfulGateway gateway) throws RestHandlerException {
    -
    +		protected CompletableFuture<String> triggerOperation(HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request, RestfulGateway gateway) throws RestHandlerException {
     			final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
     			final String requestedTargetDirectory = request.getRequestBody().getTargetDirectory();
     
     			if (requestedTargetDirectory == null && defaultSavepointDir == null) {
    -				return FutureUtils.completedExceptionally(
    -					new RestHandlerException(
    +				throw new RestHandlerException(
     						String.format("Config key [%s] is not set. Property [%s] must be provided.",
     							CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
     							SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
    -						HttpResponseStatus.BAD_REQUEST));
    +						HttpResponseStatus.BAD_REQUEST);
     			}
     
     			final String targetDirectory = requestedTargetDirectory != null ? requestedTargetDirectory : defaultSavepointDir;
    -			final CompletableFuture<String> savepointLocationFuture =
    -				gateway.triggerSavepoint(jobId, targetDirectory, RpcUtils.INF_TIMEOUT);
    -			final SavepointTriggerId savepointTriggerId = new SavepointTriggerId();
    -			completedSavepointCache.registerOngoingSavepoint(
    -				SavepointKey.of(savepointTriggerId, jobId),
    -				savepointLocationFuture);
    -			return CompletableFuture.completedFuture(
    -				new SavepointTriggerResponseBody(savepointTriggerId));
    +			return gateway.triggerSavepoint(jobId, targetDirectory, RpcUtils.INF_TIMEOUT);
    +		}
    +
    +		@Override
    +		protected SavepointKey createOperationKey(HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request) {
    +			final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
    +			return SavepointKey.of(new TriggerId(), jobId);
     		}
     	}
     
     	/**
     	 * HTTP handler to query for the status of the savepoint.
     	 */
    -	public class SavepointStatusHandler
    -			extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
    +	public class SavepointStatusHandler extends StatusHandler<RestfulGateway, SavepointInfo, SavepointStatusMessageParameters> {
     
     		public SavepointStatusHandler(
    -				final CompletableFuture<String> localRestAddress,
    -				final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    -				final Time timeout,
    -				final Map<String, String> responseHeaders) {
    +			CompletableFuture<String> localRestAddress,
    --- End diff --
    
    Unnecessary changes: `final` was fine and the indentation is off.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169675799
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java ---
    @@ -123,214 +109,112 @@ public SavepointHandlers(@Nullable final String defaultSavepointDir) {
     	/**
     	 * HTTP handler to trigger savepoints.
     	 */
    -	public class SavepointTriggerHandler
    -			extends AbstractRestHandler<RestfulGateway, SavepointTriggerRequestBody, SavepointTriggerResponseBody, SavepointTriggerMessageParameters> {
    +	public class SavepointTriggerHandler extends TriggerHandler<RestfulGateway, SavepointTriggerRequestBody, SavepointTriggerMessageParameters> {
     
     		public SavepointTriggerHandler(
    -				final CompletableFuture<String> localRestAddress,
    -				final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
    -				final Time timeout,
    -				final Map<String, String> responseHeaders) {
    +			CompletableFuture<String> localRestAddress,
    --- End diff --
    
    Unnecessary changes: `final` was fine and the indentation is off.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169706543
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerId;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.TestingGatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link AbstractAsynchronousOperationHandlers}.
    + */
    +public class AbstractAsynchronousOperationHandlersTest extends TestLogger {
    +
    +	private static final CompletableFuture<String> localRestAddress = CompletableFuture.completedFuture("localhost");
    +
    +	private static final Time timeout = Time.seconds(10L);
    +
    +	private TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers;
    +
    +	private TestingAsynchronousOperationHandlers.TestingTriggerHandler testingTriggerHandler;
    +
    +	private TestingAsynchronousOperationHandlers.TestingStatusHandler testingStatusHandler;
    +
    +	private TestingGatewayRetriever<RestfulGateway> testingGatewayRetriever;
    +
    +	@Before
    +	public void setup() {
    +		testingGatewayRetriever = new TestingGatewayRetriever<>();
    +		testingAsynchronousOperationHandlers = new TestingAsynchronousOperationHandlers();
    +
    +		testingTriggerHandler = testingAsynchronousOperationHandlers.new TestingTriggerHandler(
    +			localRestAddress,
    +			testingGatewayRetriever,
    +			timeout,
    +			Collections.emptyMap(),
    +			TestingTriggerMessageHeaders.INSTANCE);
    +
    +		testingStatusHandler = testingAsynchronousOperationHandlers.new TestingStatusHandler(
    +			localRestAddress,
    +			testingGatewayRetriever,
    +			timeout,
    +			Collections.emptyMap(),
    +			TestingStatusMessageHeaders.INSTANCE);
    +	}
    +
    +	/**
    +	 * Tests the triggering and successful completion of an asynchronous operation.
    +	 */
    +	@Test
    +	public void testOperationCompletion() throws Exception {
    +		final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
    +		final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
    +			.setTriggerSavepointFunction((JobID jobId, String directory) -> savepointFuture)
    +			.build();
    +
    +		testingGatewayRetriever.setGateway(testingRestfulGateway);
    --- End diff --
    
    You're right. Will remove it.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169687850
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationResult.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Result of an asynchronous operation.
    + *
    + * @param <V> type of the result value
    + */
    +public class AsynchronousOperationResult<V> implements AsynchronouslyCreatedResource<V> {
    +
    +	private static final String FIELD_NAME_STATUS = "status";
    +
    +	private static final String FIELD_NAME_OPERATION = "operation";
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final QueueStatus queueStatus;
    +
    +	@JsonProperty(FIELD_NAME_OPERATION)
    +	@Nullable
    +	private final V value;
    +
    +	@JsonCreator
    +	private AsynchronousOperationResult(
    +		@JsonProperty(FIELD_NAME_STATUS) QueueStatus queueStatus,
    --- End diff --
    
    nit: indentation


---

[GitHub] flink issue #5451: [FLINK-8632] [flip6] Introduce generalized asynchronous o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5451
  
    Thanks for the review @GJL. I addressed your feedback and rebased onto the latest soon to be master.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169071363
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.NotFoundException;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.messages.TriggerId;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.types.Either;
    +import org.apache.flink.util.FlinkException;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.ThreadSafe;
    +
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * HTTP handlers for asynchronous operations.
    + *
    + * <p>Some operations long-running. To avoid blocking HTTP
    --- End diff --
    
    nit: grammatical error: *Some operations long-running*


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169687251
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java ---
    @@ -109,9 +97,7 @@
      * }
      * </pre>
    --- End diff --
    
    You removed the trigger id from the savepoint response but the Javadoc still has it:
    ```
     * <pre>
     * {
     *     "status": {
     *         "id": "COMPLETED"
     *     },
     *     "savepoint": {
     *         "request-id": "7d273f5a62eb4730b9dea8e833733c1e",
     *         "location": "/tmp/savepoint-d9813b-8a68e674325b"
     *     }
     * }
    ```


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169705753
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java ---
    @@ -109,9 +97,7 @@
      * }
      * </pre>
    --- End diff --
    
    Will remove it.


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169694927
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerId;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.TestingGatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link AbstractAsynchronousOperationHandlers}.
    + */
    +public class AbstractAsynchronousOperationHandlersTest extends TestLogger {
    +
    +	private static final CompletableFuture<String> localRestAddress = CompletableFuture.completedFuture("localhost");
    +
    +	private static final Time timeout = Time.seconds(10L);
    --- End diff --
    
    I think we should make all constants (`static final`) uppercase. 


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169705447
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingGatewayRetriever.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.retriever;
    +
    +import org.apache.flink.runtime.rpc.RpcGateway;
    +
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Testing {@link GatewayRetriever} implementation which can be
    + * manually set.
    + *
    + * @param <T> type of the gateway
    + */
    +public class TestingGatewayRetriever<T extends RpcGateway> implements GatewayRetriever<T> {
    +	private CompletableFuture<T> gatewayFuture;
    +
    +	public TestingGatewayRetriever() {
    +		gatewayFuture = new CompletableFuture<>();
    +	}
    +
    +	@Override
    +	public CompletableFuture<T> getFuture() {
    +		return null;
    --- End diff --
    
    Indeed :-)


---

[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169706102
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java ---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.async;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.MessagePathParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.rest.messages.TriggerId;
    +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.TestingGatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link AbstractAsynchronousOperationHandlers}.
    + */
    +public class AbstractAsynchronousOperationHandlersTest extends TestLogger {
    +
    +	private static final CompletableFuture<String> localRestAddress = CompletableFuture.completedFuture("localhost");
    +
    +	private static final Time timeout = Time.seconds(10L);
    --- End diff --
    
    You're right. Will fix it.


---