You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/15 16:43:36 UTC
[5/5] flink git commit: [FLINK-7791] [REST][client] Integrate LIST
command into RestClusterClient
[FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient
This closes #4802.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76de286
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76de286
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76de286
Branch: refs/heads/master
Commit: a76de2860d2ee78e673c97fba3394f099389ad75
Parents: e572961
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:58:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:58 2017 +0200
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 22 +++++++
.../program/rest/RestClusterClientTest.java | 61 ++++++++++++++++++++
2 files changed, 83 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8ae18af..3916514 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -27,12 +27,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -49,6 +52,8 @@ import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -192,6 +197,23 @@ public class RestClusterClient extends ClusterClient {
.thenApply(response -> response.location);
}
+ @Override
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+ CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
+ restClusterClientConfiguration.getRestServerAddress(),
+ restClusterClientConfiguration.getRestServerPort(),
+ headers
+ );
+ return jobDetailsFuture
+ .thenApply(details -> {
+ Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
+ details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ return flattenedDetails;
+ });
+ }
+
// ======================================
// Legacy stuff we actually implement
// ======================================
http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 00c37a6..cd6fc0c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -64,6 +69,8 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -74,6 +81,9 @@ import static org.mockito.Mockito.when;
/**
* Tests for the {@link RestClusterClient}.
+ *
+ * <p>These tests verify that the client uses the appropriate headers for each
+ * request, properly constructs the request bodies/parameters and processes the responses correctly.
*/
public class RestClusterClientTest extends TestLogger {
@@ -257,6 +267,57 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ @Test
+ public void testListJobs() throws Exception {
+
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+ RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+ TestListJobsHandler listJobsHandler = new TestListJobsHandler();
+
+ RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+ @Override
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+ Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+ handlers.add(Tuple2.of(listJobsHandler.getMessageHeaders(), listJobsHandler));
+ return handlers;
+ }
+ };
+
+ RestClusterClient rcc = new RestClusterClient(config);
+ try {
+ rse.start();
+
+ {
+ CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = rcc.listJobs();
+ Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+ Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+ JobStatusMessage job1 = jobDetailsIterator.next();
+ JobStatusMessage job2 = jobDetailsIterator.next();
+ Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+ }
+ } finally {
+ rcc.shutdown();
+ rse.shutdown(Time.seconds(5));
+ }}
+
+ private static class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+ private TestListJobsHandler() {
+ super(CurrentJobsOverviewHandlerHeaders.getInstance());
+ }
+
+ @Override
+ protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+ JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+ return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
+ }
+ }
+
private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
private TestHandler(MessageHeaders<R, P, M> headers) {