You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/15 16:43:32 UTC

[1/5] flink git commit: [FLINK-6805] [cassandra] Shade indirect netty4 dependency

Repository: flink
Updated Branches:
  refs/heads/master 47fe61893 -> a76de2860


[FLINK-6805] [cassandra] Shade indirect netty4 dependency

To relocate various *indirect* netty4 dep of ver 4.0.33.Final to
classpath of Flink flavor using Maven shade plugin.

This closes #4545.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81d7c4e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81d7c4e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81d7c4e8

Branch: refs/heads/master
Commit: 81d7c4e8b111c6bf52000f68b64c402783a6ae74
Parents: 89394ec
Author: Michael Fong <mc...@gmail.com>
Authored: Tue Aug 15 14:22:23 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200

----------------------------------------------------------------------
 flink-connectors/flink-connector-cassandra/pom.xml | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81d7c4e8/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index f006f3a..c97b43f 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -72,6 +72,7 @@ under the License.
 									<include>com.datastax.cassandra:cassandra-driver-core</include>
 									<include>com.datastax.cassandra:cassandra-driver-mapping</include>
 									<include>com.google.guava:guava</include>
+									<include>io.netty:netty-*</include>
 								</includes>
 							</artifactSet>
 							<relocations>
@@ -83,6 +84,15 @@ under the License.
 										<exclude>com.google.inject.**</exclude>
 									</excludes>
 								</relocation>
+								<!--
+									For the details of relocation pattern, refer the discussion at
+									https://github.com/apache/flink/pull/4545
+									FLINK-6805
+								-->
+								<relocation>
+									<pattern>io.netty</pattern>
+									<shadedPattern>org.apache.flink.cassandra.shaded.io.netty</shadedPattern>
+								</relocation>
 							</relocations>
 						</configuration>
 					</execution>


[3/5] flink git commit: [FLINK-7774][network] fix not clearing deserializers on closing an input

Posted by ch...@apache.org.
[FLINK-7774][network] fix not clearing deserializers on closing an input

This closes #4783.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b6f0558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b6f0558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b6f0558

Branch: refs/heads/master
Commit: 4b6f05585bb548d1538ada43ed33149acbc9e6d4
Parents: 47fe618
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Sep 4 17:21:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/api/reader/AbstractRecordReader.java   | 1 +
 .../org/apache/flink/streaming/runtime/io/StreamInputProcessor.java | 1 +
 .../apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java  | 1 +
 3 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index c5aeef7..29f2b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -120,6 +120,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 			if (buffer != null && !buffer.isRecycled()) {
 				buffer.recycle();
 			}
+			deserializer.clear();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 263077d..609f8b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -264,6 +264,7 @@ public class StreamInputProcessor<IN> {
 			if (buffer != null && !buffer.isRecycled()) {
 				buffer.recycle();
 			}
+			deserializer.clear();
 		}
 
 		// cleanup the barrier handler resources

http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a25540d..7874147 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -329,6 +329,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			if (buffer != null && !buffer.isRecycled()) {
 				buffer.recycle();
 			}
+			deserializer.clear();
 		}
 
 		// cleanup the barrier handler resources


[4/5] flink git commit: [FLINK-7791] [Client] Move LIST logic into ClusterClient

Posted by ch...@apache.org.
[FLINK-7791] [Client] Move LIST logic into ClusterClient


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5729617
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5729617
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5729617

Branch: refs/heads/master
Commit: e572961749dd45e3f7444664d9ba967fa438ab55
Parents: 81d7c4e
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 16:52:10 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:51 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 119 +++----
 .../flink/client/program/ClusterClient.java     |  31 ++
 .../flink/client/CliFrontendCancelTest.java     | 164 ++++++++++
 .../flink/client/CliFrontendListCancelTest.java | 310 -------------------
 .../flink/client/CliFrontendListTest.java       |  72 +++++
 .../flink/client/program/ClusterClientTest.java |  43 +++
 6 files changed, 361 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c065453..07c6d65 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -83,6 +82,7 @@ import java.net.URL;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
@@ -420,89 +420,72 @@ public class CliFrontend {
 		}
 
 		try {
-			ActorGateway jobManagerGateway = getJobManagerGateway(options);
-
-			LOG.info("Connecting to JobManager to retrieve list of jobs");
-			Future<Object> response = jobManagerGateway.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				clientTimeout);
+			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
 
-			Object result;
+			Collection<JobStatusMessage> jobDetails;
 			try {
-				result = Await.result(response, clientTimeout);
-			}
-			catch (Exception e) {
-				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = client.listJobs();
+
+				try {
+					logAndSysout("Waiting for response...");
+					jobDetails = jobDetailsFuture.get();
+				}
+				catch (ExecutionException ee) {
+					Throwable cause = ExceptionUtils.stripExecutionException(ee);
+					throw new Exception("Failed to retrieve job list.", cause);
+				}
+			} finally {
+				client.shutdown();
 			}
 
-			if (result instanceof RunningJobsStatus) {
-				LOG.info("Successfully retrieved list of jobs");
+			LOG.info("Successfully retrieved list of jobs");
 
-				List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+			SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
+			Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
 
-				ArrayList<JobStatusMessage> runningJobs = null;
-				ArrayList<JobStatusMessage> scheduledJobs = null;
-				if (running) {
-					runningJobs = new ArrayList<JobStatusMessage>();
-				}
-				if (scheduled) {
-					scheduledJobs = new ArrayList<JobStatusMessage>();
+			final List<JobStatusMessage> runningJobs = new ArrayList<>();
+			final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+			jobDetails.forEach(details -> {
+				if (details.getJobState() == JobStatus.CREATED) {
+					scheduledJobs.add(details);
+				} else {
+					runningJobs.add(details);
 				}
+			});
 
-				for (JobStatusMessage rj : jobs) {
-					if (running && (rj.getJobState().equals(JobStatus.RUNNING)
-							|| rj.getJobState().equals(JobStatus.RESTARTING))) {
-						runningJobs.add(rj);
-					}
-					if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
-						scheduledJobs.add(rj);
-					}
+			if (running) {
+				if (runningJobs.size() == 0) {
+					System.out.println("No running jobs.");
 				}
+				else {
+					runningJobs.sort(startTimeComparator);
 
-				SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
-				Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
-					@Override
-					public int compare(JobStatusMessage o1, JobStatusMessage o2) {
-						return (int) (o1.getStartTime() - o2.getStartTime());
-					}
-				};
-
-				if (running) {
-					if (runningJobs.size() == 0) {
-						System.out.println("No running jobs.");
-					}
-					else {
-						Collections.sort(runningJobs, njec);
-
-						System.out.println("------------------ Running/Restarting Jobs -------------------");
-						for (JobStatusMessage rj : runningJobs) {
-							System.out.println(df.format(new Date(rj.getStartTime()))
-									+ " : " + rj.getJobId() + " : " + rj.getJobName() + " (" + rj.getJobState() + ")");
-						}
-						System.out.println("--------------------------------------------------------------");
+					System.out.println("------------------ Running/Restarting Jobs -------------------");
+					for (JobStatusMessage runningJob : runningJobs) {
+						System.out.println(dateFormat.format(new Date(runningJob.getStartTime()))
+							+ " : " + runningJob.getJobId() + " : " + runningJob.getJobName() + " (" + runningJob.getJobState() + ")");
 					}
+					System.out.println("--------------------------------------------------------------");
 				}
-				if (scheduled) {
-					if (scheduledJobs.size() == 0) {
-						System.out.println("No scheduled jobs.");
-					}
-					else {
-						Collections.sort(scheduledJobs, njec);
+			}
+			if (scheduled) {
+				if (scheduledJobs.size() == 0) {
+					System.out.println("No scheduled jobs.");
+				}
+				else {
+					scheduledJobs.sort(startTimeComparator);
 
-						System.out.println("----------------------- Scheduled Jobs -----------------------");
-						for (JobStatusMessage rj : scheduledJobs) {
-							System.out.println(df.format(new Date(rj.getStartTime()))
-									+ " : " + rj.getJobId() + " : " + rj.getJobName());
-						}
-						System.out.println("--------------------------------------------------------------");
+					System.out.println("----------------------- Scheduled Jobs -----------------------");
+					for (JobStatusMessage scheduledJob : scheduledJobs) {
+						System.out.println(dateFormat.format(new Date(scheduledJob.getStartTime()))
+							+ " : " + scheduledJob.getJobId() + " : " + scheduledJob.getJobName());
 					}
+					System.out.println("--------------------------------------------------------------");
 				}
-				return 0;
-			}
-			else {
-				throw new Exception("ReqeustRunningJobs requires a response of type " +
-						"RunningJobs. Instead the response is of type " + result.getClass() + ".");
 			}
+
+			return 0;
 		}
 		catch (Throwable t) {
 			return handleError(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index eb89f09..06fb42a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -54,6 +55,8 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
@@ -70,6 +73,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -684,6 +689,32 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Lists the currently running and finished jobs on the cluster.
+	 *
+	 * @return future collection of running and finished jobs
+	 * @throws Exception if no connection to the cluster could be established
+	 */
+	public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+		final ActorGateway jobManager = getJobManagerGateway();
+
+		Future<Object> response = jobManager.ask(new RequestJobDetails(true, false), timeout);
+		CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
+
+		return responseFuture.thenApply((responseMessage) -> {
+			if (responseMessage instanceof MultipleJobsDetails) {
+				MultipleJobsDetails details = (MultipleJobsDetails) responseMessage;
+				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(details.getRunning().size() + details.getFinished().size());
+				details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				return flattenedDetails;
+			} else {
+				throw new CompletionException(
+					new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+			}
+		});
+	}
+
+	/**
 	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
 	 * requested while a is running or after it has finished. The default class loader is used
 	 * to deserialize the incoming accumulator results.

http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
new file mode 100644
index 0000000..f2508dc
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.util.MockedCliFrontend;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+
+/**
+ * Tests for the CANCEL command.
+ */
+public class CliFrontendCancelTest {
+
+	@BeforeClass
+	public static void init() {
+		CliFrontendTestUtils.pipeSystemOutToNull();
+	}
+
+	@Test
+	public void testCancel() {
+		try {
+			// test unrecognized option
+			{
+				String[] parameters = {"-v", "-l"};
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+				int retCode = testFrontend.cancel(parameters);
+				assertTrue(retCode != 0);
+			}
+
+			// test missing job id
+			{
+				String[] parameters = {};
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+				int retCode = testFrontend.cancel(parameters);
+				assertTrue(retCode != 0);
+			}
+
+			// test cancel properly
+			{
+				JobID jid = new JobID();
+
+				String[] parameters = { jid.toString() };
+				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+
+				int retCode = testFrontend.cancel(parameters);
+				assertTrue(retCode == 0);
+
+				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
+			}
+
+			// test cancel properly
+			{
+				JobID jid = new JobID();
+
+				String[] parameters = { jid.toString() };
+				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true);
+
+				int retCode = testFrontend.cancel(parameters);
+				assertTrue(retCode != 0);
+
+				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
+			}
+
+			// test flip6 switch
+			{
+				String[] parameters =
+					{"-flip6", String.valueOf(new JobID())};
+				CancelOptions options = CliFrontendParser.parseCancelCommand(parameters);
+				assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Program caused an exception: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests cancelling with the savepoint option.
+	 */
+	@Test
+	public void testCancelWithSavepoint() throws Exception {
+		{
+			// Cancel with savepoint (no target directory)
+			JobID jid = new JobID();
+
+			String[] parameters = { "-s", jid.toString() };
+			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+			assertEquals(0, testFrontend.cancel(parameters));
+
+			Mockito.verify(testFrontend.client, times(1))
+				.cancelWithSavepoint(any(JobID.class), isNull(String.class));
+		}
+
+		{
+			// Cancel with savepoint (with target directory)
+			JobID jid = new JobID();
+
+			String[] parameters = { "-s", "targetDirectory", jid.toString() };
+			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+			assertEquals(0, testFrontend.cancel(parameters));
+
+			Mockito.verify(testFrontend.client, times(1))
+				.cancelWithSavepoint(any(JobID.class), notNull(String.class));
+		}
+
+		{
+			// Cancel with savepoint (with target directory), but no job ID
+			String[] parameters = { "-s", "targetDirectory" };
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			assertNotEquals(0, testFrontend.cancel(parameters));
+		}
+
+		{
+			// Cancel with savepoint (no target directory) and no job ID
+			String[] parameters = { "-s" };
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			assertNotEquals(0, testFrontend.cancel(parameters));
+		}
+	}
+
+	private static final class CancelTestCliFrontend extends MockedCliFrontend {
+
+		CancelTestCliFrontend(boolean reject) throws Exception {
+			if (reject) {
+				doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
+				doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
deleted file mode 100644
index b01d162..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.CancelOptions;
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.util.MockedCliFrontend;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.UUID;
-
-import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.times;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-
-/**
- * Tests for the CANCEL and LIST commands.
- */
-public class CliFrontendListCancelTest {
-
-	private static ActorSystem actorSystem;
-
-	@BeforeClass
-	public static void setup(){
-		pipeSystemOutToNull();
-		actorSystem = ActorSystem.create("TestingActorSystem");
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(actorSystem);
-		actorSystem = null;
-	}
-
-	@BeforeClass
-	public static void init() {
-		CliFrontendTestUtils.pipeSystemOutToNull();
-	}
-
-	@Test
-	public void testCancel() {
-		try {
-			// test unrecognized option
-			{
-				String[] parameters = {"-v", "-l"};
-				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode != 0);
-			}
-
-			// test missing job id
-			{
-				String[] parameters = {};
-				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode != 0);
-			}
-
-			// test cancel properly
-			{
-				JobID jid = new JobID();
-
-				String[] parameters = { jid.toString() };
-				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
-
-				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode == 0);
-
-				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
-			}
-
-			// test cancel properly
-			{
-				JobID jid = new JobID();
-
-				String[] parameters = { jid.toString() };
-				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true);
-
-				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode != 0);
-
-				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
-			}
-
-			// test flip6 switch
-			{
-				String[] parameters =
-					{"-flip6", String.valueOf(new JobID())};
-				CancelOptions options = CliFrontendParser.parseCancelCommand(parameters);
-				assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests cancelling with the savepoint option.
-	 */
-	@Test
-	public void testCancelWithSavepoint() throws Exception {
-		{
-			// Cancel with savepoint (no target directory)
-			JobID jid = new JobID();
-
-			String[] parameters = { "-s", jid.toString() };
-			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
-			assertEquals(0, testFrontend.cancel(parameters));
-
-			Mockito.verify(testFrontend.client, times(1))
-				.cancelWithSavepoint(any(JobID.class), isNull(String.class));
-		}
-
-		{
-			// Cancel with savepoint (with target directory)
-			JobID jid = new JobID();
-
-			String[] parameters = { "-s", "targetDirectory", jid.toString() };
-			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
-			assertEquals(0, testFrontend.cancel(parameters));
-
-			Mockito.verify(testFrontend.client, times(1))
-				.cancelWithSavepoint(any(JobID.class), notNull(String.class));
-		}
-
-		{
-			// Cancel with savepoint (with target directory), but no job ID
-			String[] parameters = { "-s", "targetDirectory" };
-			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-			assertNotEquals(0, testFrontend.cancel(parameters));
-		}
-
-		{
-			// Cancel with savepoint (no target directory) and no job ID
-			String[] parameters = { "-s" };
-			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-			assertNotEquals(0, testFrontend.cancel(parameters));
-		}
-	}
-
-	@Test
-	public void testList() {
-		try {
-			// test unrecognized option
-			{
-				String[] parameters = {"-v", "-k"};
-				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-				int retCode = testFrontend.list(parameters);
-				assertTrue(retCode != 0);
-			}
-
-			// test list properly
-			{
-				final UUID leaderSessionID = UUID.randomUUID();
-				final ActorRef jm = actorSystem.actorOf(
-						Props.create(
-								CliJobManager.class,
-								null,
-								leaderSessionID
-						)
-				);
-				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-				String[] parameters = {"-r", "-s"};
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
-				int retCode = testFrontend.list(parameters);
-				assertTrue(retCode == 0);
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-
-	private static final class CancelTestCliFrontend extends MockedCliFrontend {
-
-		CancelTestCliFrontend(boolean reject) throws Exception {
-			if (reject) {
-				doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
-				doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
-			}
-		}
-	}
-
-	private static final class InfoListTestCliFrontend extends CliFrontend {
-
-		private ActorGateway jobManagerGateway;
-
-		InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
-			super(CliFrontendTestUtils.getConfigDir());
-			this.jobManagerGateway = jobManagerGateway;
-		}
-
-		@Override
-		public ActorGateway getJobManagerGateway(CommandLineOptions options) {
-			return jobManagerGateway;
-		}
-	}
-
-	private static final class CliJobManager extends FlinkUntypedActor {
-		private final JobID jobID;
-		private final UUID leaderSessionID;
-		private final String targetDirectory;
-
-		public CliJobManager(final JobID jobID, final UUID leaderSessionID){
-			this(jobID, leaderSessionID, null);
-		}
-
-		public CliJobManager(final JobID jobID, final UUID leaderSessionID, String targetDirectory){
-			this.jobID = jobID;
-			this.leaderSessionID = leaderSessionID;
-			this.targetDirectory = targetDirectory;
-		}
-
-		@Override
-		public void handleMessage(Object message) {
-			if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
-				getSender().tell(decorateMessage(1), getSelf());
-			}
-			else if (message instanceof JobManagerMessages.CancelJob) {
-				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
-
-				if (jobID != null && jobID.equals(cancelJob.jobID())) {
-					getSender().tell(
-							decorateMessage(new Status.Success(new JobManagerMessages.CancellationSuccess(jobID, null))),
-							getSelf());
-				}
-				else {
-					getSender().tell(
-							decorateMessage(new Status.Failure(new Exception("Wrong or no JobID"))),
-							getSelf());
-				}
-			}
-			else if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
-				JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
-
-				if (jobID != null && jobID.equals(cancelJob.jobID())) {
-					if (targetDirectory == null && cancelJob.savepointDirectory() == null ||
-							targetDirectory != null && targetDirectory.equals(cancelJob.savepointDirectory())) {
-						getSender().tell(
-								decorateMessage(new JobManagerMessages.CancellationSuccess(jobID, targetDirectory)),
-								getSelf());
-					} else {
-						getSender().tell(
-								decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong target directory"))),
-								getSelf());
-					}
-				}
-				else {
-					getSender().tell(
-							decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong or no JobID"))),
-							getSelf());
-				}
-			}
-			else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
-				getSender().tell(
-						decorateMessage(new JobManagerMessages.RunningJobsStatus()),
-						getSelf());
-			}
-		}
-
-		@Override
-		protected UUID getLeaderSessionID() {
-			return leaderSessionID;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
new file mode 100644
index 0000000..b559af1
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the LIST command.
+ */
+public class CliFrontendListTest extends TestLogger {
+
+	@BeforeClass
+	public static void init() {
+		CliFrontendTestUtils.pipeSystemOutToNull();
+	}
+
+	@Test
+	public void testList() throws Exception {
+		// test unrecognized option
+		{
+			String[] parameters = {"-v", "-k"};
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			int retCode = testFrontend.list(parameters);
+			assertTrue(retCode != 0);
+		}
+
+		// test list properly
+		{
+			String[] parameters = {"-r", "-s"};
+			ListTestCliFrontend testFrontend = new ListTestCliFrontend();
+			int retCode = testFrontend.list(parameters);
+			assertTrue(retCode == 0);
+			Mockito.verify(testFrontend.client, times(1))
+				.listJobs();
+		}
+	}
+
+	private static final class ListTestCliFrontend extends MockedCliFrontend {
+
+		ListTestCliFrontend() throws Exception {
+			when(client.listJobs()).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 5f6d9fe..ec73d76 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -21,15 +21,23 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 
 import scala.concurrent.Future;
@@ -134,6 +142,27 @@ public class ClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testClusterClientList() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		TestListActorGateway gateway = new TestListActorGateway();
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
+			Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+			Assert.assertTrue(gateway.messageArrived);
+			Assert.assertEquals(2, jobDetails.size());
+			Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+			JobStatusMessage job1 = jobDetailsIterator.next();
+			JobStatusMessage job2 = jobDetailsIterator.next();
+			Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
 	private static class TestStopActorGateway extends DummyActorGateway {
 
 		private final JobID expectedJobID;
@@ -218,6 +247,20 @@ public class ClusterClientTest extends TestLogger {
 		}
 	}
 
+	private static class TestListActorGateway extends TestActorGateway<RequestJobDetails, MultipleJobsDetails> {
+
+		TestListActorGateway() {
+			super(RequestJobDetails.class);
+		}
+
+		@Override
+		public MultipleJobsDetails process(RequestJobDetails message) {
+			JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+			JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+			return new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished));
+		}
+	}
+
 	private static class TestClusterClient extends StandaloneClusterClient {
 
 		private final ActorGateway jobmanagerGateway;


[5/5] flink git commit: [FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

Posted by ch...@apache.org.
[FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient

This closes #4802.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76de286
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76de286
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76de286

Branch: refs/heads/master
Commit: a76de2860d2ee78e673c97fba3394f099389ad75
Parents: e572961
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:58:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:58 2017 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 22 +++++++
 .../program/rest/RestClusterClientTest.java     | 61 ++++++++++++++++++++
 2 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8ae18af..3916514 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -27,12 +27,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -49,6 +52,8 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -192,6 +197,23 @@ public class RestClusterClient extends ClusterClient {
 			.thenApply(response -> response.location);
 	}
 
+	@Override
+	public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+		CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
+		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
+			restClusterClientConfiguration.getRestServerAddress(),
+			restClusterClientConfiguration.getRestServerPort(),
+			headers
+		);
+		return jobDetailsFuture
+			.thenApply(details -> {
+				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
+				details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+				return flattenedDetails;
+			});
+	}
+
 	// ======================================
 	// Legacy stuff we actually implement
 	// ======================================

http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 00c37a6..cd6fc0c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -64,6 +69,8 @@ import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -74,6 +81,9 @@ import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link RestClusterClient}.
+ *
+ * <p>These tests verify that the client uses the appropriate headers for each
+ * request, properly constructs the request bodies/parameters and processes the responses correctly.
  */
 public class RestClusterClientTest extends TestLogger {
 
@@ -257,6 +267,57 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testListJobs() throws Exception {
+
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		TestListJobsHandler listJobsHandler = new TestListJobsHandler();
+
+		RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+			@Override
+			protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+				Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+				handlers.add(Tuple2.of(listJobsHandler.getMessageHeaders(), listJobsHandler));
+				return handlers;
+			}
+		};
+
+		RestClusterClient rcc = new RestClusterClient(config);
+		try {
+			rse.start();
+
+			{
+				CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = rcc.listJobs();
+				Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+				Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+				JobStatusMessage job1 = jobDetailsIterator.next();
+				JobStatusMessage job2 = jobDetailsIterator.next();
+				Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+			}
+		} finally {
+			rcc.shutdown();
+			rse.shutdown(Time.seconds(5));
+		}}
+
+	private static class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+		private TestListJobsHandler() {
+			super(CurrentJobsOverviewHandlerHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+			JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+			return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
+		}
+	}
+
 	private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
 
 		private TestHandler(MessageHeaders<R, P, M> headers) {


[2/5] flink git commit: [FLINK-6703][docs] Document how to take a savepoint on YARN

Posted by ch...@apache.org.
[FLINK-6703][docs] Document how to take a savepoint on YARN

This closes #4721.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89394ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89394ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89394ec8

Branch: refs/heads/master
Commit: 89394ec8094205cb3fd3a47a95dacd1b18c31088
Parents: 4b6f055
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Sep 24 21:56:58 2017 -0700
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200

----------------------------------------------------------------------
 docs/ops/cli.md              | 23 ++++++++++++++++++-----
 docs/ops/state/savepoints.md | 18 +++++++++++++++---
 2 files changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89394ec8/docs/ops/cli.md
----------------------------------------------------------------------
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 11c8caf..4f80b08 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -138,17 +138,30 @@ This allows the job to finish processing all inflight data.
 
 [Savepoints]({{site.baseurl}}/ops/state/savepoints.html) are controlled via the command line client:
 
-#### Trigger a savepoint
+#### Trigger a Savepoint
 
 {% highlight bash %}
-./bin/flink savepoint <jobID> [savepointDirectory]
+./bin/flink savepoint <jobId> [savepointDirectory]
 {% endhighlight %}
 
-Returns the path of the created savepoint. You need this path to restore and dispose savepoints.
+This will trigger a savepoint for the job with ID `jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
 
-You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)).
 
-##### Cancel with a savepoint
+Furthermore, you can optionally specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+
+If you don't specify a target directory, you need to have [configured a default directory](#configuration) (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)). Otherwise, triggering the savepoint will fail.
+
+#### Trigger a Savepoint with YARN
+
+{% highlight bash %}
+./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>
+{% endhighlight %}
+
+This will trigger a savepoint for the job with ID `jobId` and YARN application ID `yarnAppId`, and returns the path of the created savepoint.
+
+Everything else is the same as described in the above **Trigger a Savepoint** section.
+
+#### Cancel with a savepoint
 
 You can atomically trigger a savepoint and cancel a job.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89394ec8/docs/ops/state/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index d6d4c53..057469a 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -103,12 +103,24 @@ Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state wi
 #### Trigger a Savepoint
 
 ```sh
-$ bin/flink savepoint :jobId [:targetDirectory]
+$ bin/flink savepoint :jobId [:savepointDirectory]
 ```
 
-This will trigger a savepoint for the job with ID `:jobid`. Furthermore, you can specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+This will trigger a savepoint for the job with ID `:jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
 
-If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, triggering the savepoint will fail.
+Furthermore, you can optionally specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+
+If you don't specify a target directory, you need to have [configured a default directory](#configuration) (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)). Otherwise, triggering the savepoint will fail.
+
+#### Trigger a Savepoint with YARN
+
+```sh
+$ bin/flink savepoint :jobId [:savepointDirectory] -yid :yarnAppId
+```
+
+This will trigger a savepoint for the job with ID `:jobId` and YARN application ID `:yarnAppId`, and returns the path of the created savepoint.
+
+Everything else is the same as described in the above **Trigger a Savepoint** section.
 
 #### Cancel Job with Savepoint