You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/05/24 08:19:54 UTC

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6069

    [FLINK-9416] Make all RestClusterClient calls retriable

    ## What is the purpose of the change
    
    This commit changes the RestClusterClient calls such that they are all retriable wrt
    to connection errors and if the service is currently unavailable (return code 503).
    
    Moreover, it changes the retry behaviour for polling the JobResult such that it fails
    now if the cluster returns a NOT_FOUND code.
    
    This PR is based on #6068.
    
    cc @GJL 
    
    ## Verifying this change
    
    - Added `RestClusterClientTest#testRetriableSendOperationIfConnectionErrorOrServiceUnavailable` and `testSendIsNotRetriableIfHttpNotFound`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink hardenRestClient

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6069
    
----
commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann <tr...@...>
Date:   2018-05-23T16:50:27Z

    [FLINK-9427] Fix registration and request slot race condition in TaskExecutor
    
    This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
    it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
    registration was completed. Due to this, the TaskExecutor did not have all information it needed
    to accept task submissions.
    
    The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
    he SlotManager could already assign these slots to pending slot requests. With this commit, the
    registration protocol changes such that the TaskExecutor first registers at the ResourceManager
    and only after completing this step, it will announce the available slots to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann <tr...@...>
Date:   2018-05-23T21:48:38Z

    [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal state
    
    This commit lets the Dispatcher remove the RunningJobsRegistry entry for a completed job
    when it is removed from the Dispatcher.

commit 61817beea5dfa1fdf20dcd6266b5899769307f6b
Author: Till Rohrmann <tr...@...>
Date:   2018-05-24T08:01:23Z

    [FLINK-9416] Make all RestClusterClient calls retriable
    
    This commit changes the RestClusterClient calls such that they are all retriable wrt
    to connection errors and if the service is currently unavailable (return code 503).
    
    Moreover, it changes the retry behaviour for polling the JobResult such that it fails
    now if the cluster returns a NOT_FOUND code.

----


---

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6069#discussion_r190605670
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -314,7 +319,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     
     		log.info("Requesting blob server port.");
     		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(
    -			BlobServerPortHeaders.getInstance());
    +			BlobServerPortHeaders.getInstance(),
    +			EmptyMessageParameters.getInstance());
    --- End diff --
    
    This is not needed. There should be an overload of `sendRequest` that does it.


---

[GitHub] flink issue #6069: [FLINK-9416] Make all RestClusterClient calls retriable

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6069
  
    Thanks for the review @GJL. I will address your comments and then merge this PR.


---

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6069#discussion_r190605113
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -274,11 +275,17 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     		final JobMessageParameters  params = new JobMessageParameters();
     		params.jobPathParameter.resolve(jobId);
     
    -		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(detailsHeaders, params);
    +		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(
    +			detailsHeaders,
    +			params);
     
     		return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
     	}
     
    +	private Predicate<Throwable> isConnectionProblemOrServiceUnavailable() {
    --- End diff --
    
    nit: can be `static` and defined closer to `isConnectionProblemException` and `isServiceUnavailable`


---

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6069#discussion_r190609681
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -274,11 +275,17 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     		final JobMessageParameters  params = new JobMessageParameters();
     		params.jobPathParameter.resolve(jobId);
     
    -		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(detailsHeaders, params);
    +		CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(
    +			detailsHeaders,
    +			params);
     
     		return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
     	}
     
    +	private Predicate<Throwable> isConnectionProblemOrServiceUnavailable() {
    --- End diff --
    
    Good idea. Will change it.


---

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6069


---

[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6069#discussion_r190609755
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -314,7 +319,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
     
     		log.info("Requesting blob server port.");
     		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(
    -			BlobServerPortHeaders.getInstance());
    +			BlobServerPortHeaders.getInstance(),
    +			EmptyMessageParameters.getInstance());
    --- End diff --
    
    Good point. Will change it.


---