You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/15 16:43:36 UTC

[5/5] flink git commit: [FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

[FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

This closes #4802.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76de286
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76de286
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76de286

Branch: refs/heads/master
Commit: a76de2860d2ee78e673c97fba3394f099389ad75
Parents: e572961
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:58:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:58 2017 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 22 +++++++
 .../program/rest/RestClusterClientTest.java     | 61 ++++++++++++++++++++
 2 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8ae18af..3916514 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -27,12 +27,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -49,6 +52,8 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -192,6 +197,23 @@ public class RestClusterClient extends ClusterClient {
 			.thenApply(response -> response.location);
 	}
 
+	@Override
+	public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+		CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
+		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
+			restClusterClientConfiguration.getRestServerAddress(),
+			restClusterClientConfiguration.getRestServerPort(),
+			headers
+		);
+		return jobDetailsFuture
+			.thenApply(details -> {
+				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
+				details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				return flattenedDetails;
+			});
+	}
+
 	// ======================================
 	// Legacy stuff we actually implement
 	// ======================================

http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 00c37a6..cd6fc0c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -64,6 +69,8 @@ import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -74,6 +81,9 @@ import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link RestClusterClient}.
+ *
+ * <p>These tests verify that the client uses the appropriate headers for each
+ * request, properly constructs the request bodies/parameters and processes the responses correctly.
  */
 public class RestClusterClientTest extends TestLogger {
 
@@ -257,6 +267,57 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testListJobs() throws Exception {
+
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		TestListJobsHandler listJobsHandler = new TestListJobsHandler();
+
+		RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+			@Override
+			protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+				Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+				handlers.add(Tuple2.of(listJobsHandler.getMessageHeaders(), listJobsHandler));
+				return handlers;
+			}
+		};
+
+		RestClusterClient rcc = new RestClusterClient(config);
+		try {
+			rse.start();
+
+			{
+				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = rcc.listJobs();
+				Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+				Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+				JobStatusMessage job1 = jobDetailsIterator.next();
+				JobStatusMessage job2 = jobDetailsIterator.next();
+				Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+			}
+		} finally {
+			rcc.shutdown();
+			rse.shutdown(Time.seconds(5));
+		}}
+
+	private static class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+		private TestListJobsHandler() {
+			super(CurrentJobsOverviewHandlerHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+			JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+			return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
+		}
+	}
+
 	private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
 
 		private TestHandler(MessageHeaders<R, P, M> headers) {