You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/10/11 13:06:38 UTC

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/4802

     [FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

    ## What is the purpose of the change
    
    This PR integrates the LIST command into the RestClusterClient. This is a fully working implementation and leverages the CurrentJobsOverviewHeaders.
    
    ## Brief change log
    
    * move LIST logic from CliFrontend into ClusterClient
    * separate CliFrontendListCancelTest into distinct test cases for list & cancel
    * implement LIST logic for RestClusterClient, based on the CurrentJobsOverviewHeaders
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    * the changes to the CliFrontend are covered by modified tests in CliFrontendListTest
    * the changes to the ClusterClient are covered by new tests in ClusterClientTest
    * the changes to the RestClusterClient are covered by RestClusterClientTest#testListJobs
    
    For manual verification:
    * start a flip6 cluster with `start-cluster.sh flip6`
    * submit a long-running job with `flink run -flip <jar>`
    * execute the list command with `flink list -flip6`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not documented)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 7791

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4802.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4802
    
----
commit 4463158258aff8f63c47b96dd4d3f10e20c1987f
Author: zentol <ch...@apache.org>
Date:   2017-10-11T11:43:40Z

    [FLINK-7807] [REST] Log exceptions in HandlerUtils methods

commit cc80c685aab4daf3aa7a0fd320e99627cbba8b8c
Author: zentol <ch...@apache.org>
Date:   2017-10-11T11:47:38Z

    [FLINK-7808] [REST] JobDetails constructor checks size of tasksPerState argument

commit 274c3436c7d5566ad16d422b28a3e491ac58096b
Author: zentol <ch...@apache.org>
Date:   2017-10-10T14:52:10Z

    [FLINK-7791] [Client] Move LIST logic into ClusterClient

commit eb6beff15f37edd8efd68791526a930e44c1f007
Author: zentol <ch...@apache.org>
Date:   2017-10-11T11:58:59Z

    [FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

----


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144027945
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -420,89 +420,72 @@ protected int list(String[] args) {
     		}
     
     		try {
    -			ActorGateway jobManagerGateway = getJobManagerGateway(options);
    -
    -			LOG.info("Connecting to JobManager to retrieve list of jobs");
    -			Future<Object> response = jobManagerGateway.ask(
    -				JobManagerMessages.getRequestRunningJobsStatus(),
    -				clientTimeout);
    +			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    +			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
     
    -			Object result;
    +			Collection<JobDetails> jobDetails;
     			try {
    -				result = Await.result(response, clientTimeout);
    -			}
    -			catch (Exception e) {
    -				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
    +				CompletableFuture<Collection<JobDetails>> jobDetailsFuture = client.listJobs();
    --- End diff --
    
    I can change it to that.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    I've modified the clients to return both running and finished jobs.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    What do you mean when you say triggerSavepoint is not a command of the ClusterClient? It does have a `triggerSavepoint` method.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144011359
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -192,6 +196,18 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     			.thenApply(response -> response.location);
     	}
     
    +	@Override
    +	public CompletableFuture<Collection<JobDetails>> listJobs() throws Exception {
    +		CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
    +		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
    +			restClusterClientConfiguration.getRestServerAddress(),
    +			restClusterClientConfiguration.getRestServerPort(),
    +			headers
    +		);
    +		return jobDetailsFuture
    +			.thenApply(MultipleJobsDetails::getRunning);
    --- End diff --
    
    we don't _have_ to, as the LIST command only cares about running jobs, but we certainly could just return all. The CLI is filtering the jobs anyway.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144009864
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java ---
    @@ -134,6 +140,25 @@ public void testClusterClientSavepoint() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testClusterClientList() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setString(JobManagerOptions.ADDRESS, "localhost");
    +
    +		TestListActorGateway gateway = new TestListActorGateway();
    +		ClusterClient clusterClient = new TestClusterClient(config, gateway);
    +		try {
    +			CompletableFuture<Collection<JobDetails>> jobDetailsFuture = clusterClient.listJobs();
    +			Collection<JobDetails> jobDetails = jobDetailsFuture.get();
    +			Assert.assertTrue(gateway.messageArrived);
    +			// finished jobs should be ignored
    +			Assert.assertEquals(1, jobDetails.size());
    --- End diff --
    
    No, the LIST command only lists running/scheduled jobs.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144016836
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -420,89 +420,72 @@ protected int list(String[] args) {
     		}
     
     		try {
    -			ActorGateway jobManagerGateway = getJobManagerGateway(options);
    -
    -			LOG.info("Connecting to JobManager to retrieve list of jobs");
    -			Future<Object> response = jobManagerGateway.ask(
    -				JobManagerMessages.getRequestRunningJobsStatus(),
    -				clientTimeout);
    +			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    +			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
     
    -			Object result;
    +			Collection<JobDetails> jobDetails;
     			try {
    -				result = Await.result(response, clientTimeout);
    -			}
    -			catch (Exception e) {
    -				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
    +				CompletableFuture<Collection<JobDetails>> jobDetailsFuture = client.listJobs();
    --- End diff --
    
    Even if we change how we retrieve it we can still wrap them in a JobDetails object; it doesn't necessarily leak to the outside how we retrieve them. Given that the CLI requires the job id, job name, start/end timestamp and job status we aren't saving much by creating a slimmed down JobDetails class.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144004816
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -420,89 +420,72 @@ protected int list(String[] args) {
     		}
     
     		try {
    -			ActorGateway jobManagerGateway = getJobManagerGateway(options);
    -
    -			LOG.info("Connecting to JobManager to retrieve list of jobs");
    -			Future<Object> response = jobManagerGateway.ask(
    -				JobManagerMessages.getRequestRunningJobsStatus(),
    -				clientTimeout);
    +			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    +			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
     
    -			Object result;
    +			Collection<JobDetails> jobDetails;
     			try {
    -				result = Await.result(response, clientTimeout);
    -			}
    -			catch (Exception e) {
    -				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
    +				CompletableFuture<Collection<JobDetails>> jobDetailsFuture = client.listJobs();
    --- End diff --
    
    Should we hide the fact that we are retrieving `JobDetails` internally and still only return a `Collection<JobStatus>`? The reason is that in the future we might change how we retrieve the job ids, job names plus their state.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144005365
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.client;
    +
    +import org.apache.flink.client.util.MockedCliFrontend;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +
    +import java.util.Collections;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.times;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the CANCEL command.
    --- End diff --
    
    typo: LIST command.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144019412
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -420,89 +420,72 @@ protected int list(String[] args) {
     		}
     
     		try {
    -			ActorGateway jobManagerGateway = getJobManagerGateway(options);
    -
    -			LOG.info("Connecting to JobManager to retrieve list of jobs");
    -			Future<Object> response = jobManagerGateway.ask(
    -				JobManagerMessages.getRequestRunningJobsStatus(),
    -				clientTimeout);
    +			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    +			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
     
    -			Object result;
    +			Collection<JobDetails> jobDetails;
     			try {
    -				result = Await.result(response, clientTimeout);
    -			}
    -			catch (Exception e) {
    -				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
    +				CompletableFuture<Collection<JobDetails>> jobDetailsFuture = client.listJobs();
    --- End diff --
    
    I thought about the `JobStatusMessage` class. I'm not entirely sure, because whatever you expose can be used and once we change that we no longer retrieve the full set of details for a job, it will be difficult to change it.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    I'm not entirely sure whether this "Waiting..." is consistent. The current `list` implementation does not do it. Moreover, so far all `ClusterClient` operations are synchronous (e.g. `stop` and `cancel`). In general I don't have strong feelings about returning a future or the result, but so far I think we always made the call synchronous and, thus, diverge from it with this change.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    But `triggerSavepoint` is not a command of the `ClusterClient` but of the `CliFrontend`. All other `ClusterClient` commands like `stop` and `cancel` behave differently.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144006709
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java ---
    @@ -134,6 +140,25 @@ public void testClusterClientSavepoint() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testClusterClientList() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setString(JobManagerOptions.ADDRESS, "localhost");
    +
    +		TestListActorGateway gateway = new TestListActorGateway();
    +		ClusterClient clusterClient = new TestClusterClient(config, gateway);
    +		try {
    +			CompletableFuture<Collection<JobDetails>> jobDetailsFuture = clusterClient.listJobs();
    +			Collection<JobDetails> jobDetails = jobDetailsFuture.get();
    +			Assert.assertTrue(gateway.messageArrived);
    +			// finished jobs should be ignored
    +			Assert.assertEquals(1, jobDetails.size());
    --- End diff --
    
    Shouldn't we also retrieve the finished jobs here?


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    True, you're right. I guess I overlooked it. But it also includes the `trigger` prefix which indicates that it does not take a savepoint but only triggers it. If we want to be consistent, then we would also return a future for the `stop` and `cancel` call on which you can wait if you want to wait for the completion of the call. But I guess that this part will be reworked anyway in the future when we refactor the client. Thus, I guess you can merge the PR.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144007707
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---
    @@ -684,6 +688,29 @@ public void stop(final JobID jobId) throws Exception {
     	}
     
     	/**
    +	 * Lists the currently running jobs on the cluster.
    +	 *
    +	 * @return future collection of running jobs
    +	 * @throws Exception if  no connection to the cluster could be established
    +	 */
    +	public CompletableFuture<Collection<JobDetails>> listJobs() throws Exception {
    --- End diff --
    
    I'm wondering whether the `ClusterClient` should issue a synchronous operation instead of a asynchronous operation (simply waiting on the futures completion). So far all the `ClusterClient` operations are blocking and wait for the completion. Not entirely sure which is more intuitive to use for the user. Of course the asynchronous variant gives more freedom.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4802#discussion_r144005254
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -192,6 +196,18 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     			.thenApply(response -> response.location);
     	}
     
    +	@Override
    +	public CompletableFuture<Collection<JobDetails>> listJobs() throws Exception {
    +		CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
    +		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
    +			restClusterClientConfiguration.getRestServerAddress(),
    +			restClusterClientConfiguration.getRestServerPort(),
    +			headers
    +		);
    +		return jobDetailsFuture
    +			.thenApply(MultipleJobsDetails::getRunning);
    --- End diff --
    
    I think we also have to return the list of completed jobs not only the running.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    merging.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    triggerSavepoint is also asynchronous. It is true that the list command does not print waiting, but the savepoint commands do.


---

[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4802
  
    The reason why i went for the CompletableFuture route is to stay closes to the current code. The CliFrontend is waiting the the response and prints "Waiting...", which we can't do when doing the call synchronously since the ClusterClient shouldn't print anything.
    
    That said, i do like making them asynchronous in general.


---

[GitHub] flink pull request #4802: [FLINK-7791] [REST][client] Integrate LIST command...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4802


---