You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 11:53:54 UTC

[1/5] flink git commit: [refactor] [tests] Generalize test handler generation

Repository: flink
Updated Branches:
  refs/heads/master 367e430e5 -> 7697a3254


[refactor] [tests] Generalize test handler generation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90eb9028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90eb9028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90eb9028

Branch: refs/heads/master
Commit: 90eb902830749ab30e85c5c3484c0f3e65321e18
Parents: c4430e6
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:23:21 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     | 39 +++++++++++---------
 1 file changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90eb9028/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 617dd38..7c71fd0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -39,6 +39,10 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
@@ -128,15 +132,11 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	private static class TestBlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
+	private static class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
 		private volatile boolean portRetrieved = false;
 
 		private TestBlobServerPortHandler() {
-			super(
-				CompletableFuture.completedFuture(restAddress),
-				mockGatewayRetriever,
-				RpcUtils.INF_TIMEOUT,
-				BlobServerPortHeaders.getInstance());
+			super(BlobServerPortHeaders.getInstance());
 		}
 
 		@Override
@@ -146,15 +146,11 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	private static class TestJobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+	private static class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 		private volatile boolean jobSubmitted = false;
 
 		private TestJobSubmitHandler() {
-			super(
-				CompletableFuture.completedFuture(restAddress),
-				mockGatewayRetriever,
-				RpcUtils.INF_TIMEOUT,
-				JobSubmitHeaders.getInstance());
+			super(JobSubmitHeaders.getInstance());
 		}
 
 		@Override
@@ -164,16 +160,12 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
-	private static class TestJobTerminationHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
+	private static class TestJobTerminationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
 		private volatile boolean jobCanceled = false;
 		private volatile boolean jobStopped = false;
 
 		private TestJobTerminationHandler() {
-			super(
-				CompletableFuture.completedFuture(restAddress),
-				mockGatewayRetriever,
-				RpcUtils.INF_TIMEOUT,
-				JobTerminationHeaders.getInstance());
+			super(JobTerminationHeaders.getInstance());
 		}
 
 		@Override
@@ -189,4 +181,15 @@ public class RestClusterClientTest extends TestLogger {
 			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 		}
 	}
+
+	private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
+
+		private TestHandler(MessageHeaders<R, P, M> headers) {
+			super(
+				CompletableFuture.completedFuture(restAddress),
+				mockGatewayRetriever,
+				RpcUtils.INF_TIMEOUT,
+				headers);
+		}
+	}
 }


[4/5] flink git commit: [FLINK-7780] [Client] Move savepoint logic into ClusterClient

Posted by ch...@apache.org.
[FLINK-7780] [Client] Move savepoint logic into ClusterClient


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e1e330
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e1e330
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e1e330

Branch: refs/heads/master
Commit: e8e1e330a62bcdad939c896ab807362cc346278b
Parents: 90eb902
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 13:34:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:58:54 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  49 ++++----
 .../flink/client/program/ClusterClient.java     |  34 +++++
 .../flink/client/CliFrontendSavepointTest.java  | 126 ++++++-------------
 .../flink/client/program/ClusterClientTest.java |  58 ++++++++-
 4 files changed, 144 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9be8295..c065453 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -89,16 +89,15 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 
 /**
  * Implementation of a simple command line frontend for executing programs.
@@ -726,35 +725,29 @@ public class CliFrontend {
 	 */
 	private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
 		try {
-			ActorGateway jobManager = getJobManagerGateway(options);
-
-			logAndSysout("Triggering savepoint for job " + jobId + ".");
-			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
-					new FiniteDuration(1, TimeUnit.HOURS));
-
-			Object result;
+			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
 			try {
-				logAndSysout("Waiting for response...");
-				result = Await.result(response, FiniteDuration.Inf());
-			}
-			catch (Exception e) {
-				throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
-			}
+				logAndSysout("Triggering savepoint for job " + jobId + ".");
+				CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
 
-			if (result instanceof TriggerSavepointSuccess) {
-				TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
-				logAndSysout("Savepoint completed. Path: " + success.savepointPath());
+				String savepointPath;
+				try {
+					logAndSysout("Waiting for response...");
+					savepointPath = savepointPathFuture.get();
+				}
+				catch (ExecutionException ee) {
+					Throwable cause = ExceptionUtils.stripExecutionException(ee);
+					throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
+				}
+
+				logAndSysout("Savepoint completed. Path: " + savepointPath);
 				logAndSysout("You can resume your program from this savepoint with the run command.");
 
 				return 0;
 			}
-			else if (result instanceof TriggerSavepointFailure) {
-				TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
-				throw failure.cause();
-			}
-			else {
-				throw new IllegalStateException("Unknown JobManager response of type " +
-						result.getClass());
+			finally {
+				client.shutdown();
 			}
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 78455c1..eb89f09 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -72,6 +73,9 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 
 import scala.Option;
 import scala.Tuple2;
@@ -650,6 +654,36 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
+	 * directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY} if it is null.
+	 *
+	 * @param jobId job id
+	 * @param savepointDirectory directory the savepoint should be written to
+	 * @return path future where the savepoint is located
+	 * @throws Exception if  no connection to the cluster could be established
+	 */
+	public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+		final ActorGateway jobManager = getJobManagerGateway();
+
+		Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
+			new FiniteDuration(1, TimeUnit.HOURS));
+		CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
+
+		return responseFuture.thenApply((responseMessage) -> {
+			if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
+				JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage;
+				return success.savepointPath();
+			} else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
+				JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage;
+				throw new CompletionException(failure.cause());
+			} else {
+				throw new CompletionException(
+					new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+			}
+		});
+	}
+
+	/**
 	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
 	 * requested while a is running or after it has finished. The default class loader is used
 	 * to deserialize the incoming accumulator results.

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index cfed859..1f0d356 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
@@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.util.concurrent.CompletableFuture;
 import java.util.zip.ZipOutputStream;
 
-import scala.Option;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -76,31 +79,19 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
 
 			String savepointPath = "expectedSavepointPath";
 
-			triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);
 
 			String[] parameters = { jobId.toString() };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), isNull(String.class));
 
-			assertTrue(buffer.toString().contains("expectedSavepointPath"));
+			assertTrue(buffer.toString().contains(savepointPath));
 		}
 		finally {
 			restoreStdOutAndStdErr();
@@ -113,29 +104,17 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
 
 			Exception testException = new Exception("expectedTestException");
 
-			triggerResponse.success(new TriggerSavepointFailure(jobId, testException));
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);
 
 			String[] parameters = { jobId.toString() };
 			int returnCode = frontend.savepoint(parameters);
 
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
+			assertNotEquals(0, returnCode);
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), isNull(String.class));
 
 			assertTrue(buffer.toString().contains("expectedTestException"));
 		}
@@ -162,46 +141,9 @@ public class CliFrontendSavepointTest {
 		}
 	}
 
-	@Test
-	public void testTriggerSavepointFailureUnknownResponse() throws Exception {
-		replaceStdOutAndStdErr();
-
-		try {
-			JobID jobId = new JobID();
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-
-			triggerResponse.success("UNKNOWN RESPONSE");
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { jobId.toString() };
-			int returnCode = frontend.savepoint(parameters);
-
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
-					any(FiniteDuration.class));
-
-			String errMsg = buffer.toString();
-			assertTrue(errMsg.contains("IllegalStateException"));
-			assertTrue(errMsg.contains("Unknown JobManager response"));
-		}
-		finally {
-			restoreStdOutAndStdErr();
-		}
-	}
-
 	/**
 	 * Tests that a CLI call with a custom savepoint directory target is
-	 * forwarded correctly to the JM.
+	 * forwarded correctly to the cluster client.
 	 */
 	@Test
 	public void testTriggerSavepointCustomTarget() throws Exception {
@@ -209,30 +151,19 @@ public class CliFrontendSavepointTest {
 
 		try {
 			JobID jobId = new JobID();
-			Option<String> customTarget = Option.apply("customTargetDirectory");
-			ActorGateway jobManager = mock(ActorGateway.class);
 
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+			String savepointDirectory = "customTargetDirectory";
 
-			when(jobManager.ask(
-					Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-			String savepointPath = "expectedSavepointPath";
-			triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
+			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);
 
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { jobId.toString(), customTarget.get() };
+			String[] parameters = { jobId.toString(), savepointDirectory };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
-					any(FiniteDuration.class));
+			verify(frontend.client, times(1))
+				.triggerSavepoint(eq(jobId), eq(savepointDirectory));
 
-			assertTrue(buffer.toString().contains("expectedSavepointPath"));
+			assertTrue(buffer.toString().contains(savepointDirectory));
 		}
 		finally {
 			restoreStdOutAndStdErr();
@@ -444,4 +375,17 @@ public class CliFrontendSavepointTest {
 		System.setOut(stdOut);
 		System.setErr(stdErr);
 	}
+
+	private static final class SavepointTestCliFrontend extends MockedCliFrontend {
+
+		SavepointTestCliFrontend(String expectedResponse) throws Exception {
+			when(client.triggerSavepoint(any(JobID.class), anyString()))
+				.thenReturn(CompletableFuture.completedFuture(expectedResponse));
+		}
+
+		SavepointTestCliFrontend(Exception expectedException) throws Exception {
+			when(client.triggerSavepoint(any(JobID.class), anyString()))
+				.thenReturn(FutureUtils.completedExceptionally(expectedException));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index ad34864..5f6d9fe 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+
 import scala.concurrent.Future;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
@@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger {
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 
 		JobID jobID = new JobID();
+		String savepointDirectory = "/test/directory";
+		String savepointPath = "/test/path";
+		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointDirectory, savepointPath);
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory);
+			Assert.assertTrue(gateway.messageArrived);
+			Assert.assertEquals(savepointPath, path);
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
+	@Test
+	public void testClusterClientSavepoint() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		JobID jobID = new JobID();
+		String savepointDirectory = "/test/directory";
 		String savepointPath = "/test/path";
-		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath);
+		TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory, savepointPath);
 		ClusterClient clusterClient = new TestClusterClient(config, gateway);
 		try {
-			clusterClient.cancelWithSavepoint(jobID, savepointPath);
+			CompletableFuture<String> pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory);
 			Assert.assertTrue(gateway.messageArrived);
+			Assert.assertEquals(savepointPath, pathFuture.get());
 		} finally {
 			clusterClient.shutdown();
 		}
@@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger {
 
 		private final JobID expectedJobID;
 		private final String expectedTargetDirectory;
+		private final String savepointPathToReturn;
 
-		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
 			super(JobManagerMessages.CancelJobWithSavepoint.class);
 			this.expectedJobID = expectedJobID;
 			this.expectedTargetDirectory = expectedTargetDirectory;
+			this.savepointPathToReturn = savepointPathToReturn;
 		}
 
 		@Override
 		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
 			Assert.assertEquals(expectedJobID, message.jobID());
 			Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
-			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), savepointPathToReturn);
+		}
+	}
+
+	private static class TestSavepointActorGateway extends TestActorGateway<JobManagerMessages.TriggerSavepoint, JobManagerMessages.TriggerSavepointSuccess> {
+
+		private final JobID expectedJobID;
+		private final String expectedTargetDirectory;
+		private final String savepointPathToReturn;
+
+		private TestSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
+			super(JobManagerMessages.TriggerSavepoint.class);
+			this.expectedJobID = expectedJobID;
+			this.expectedTargetDirectory = expectedTargetDirectory;
+			this.savepointPathToReturn = savepointPathToReturn;
+		}
+
+		@Override
+		public JobManagerMessages.TriggerSavepointSuccess process(JobManagerMessages.TriggerSavepoint message) {
+			Assert.assertEquals(expectedJobID, message.jobId());
+			if (expectedTargetDirectory == null) {
+				Assert.assertTrue(message.savepointDirectory().isEmpty());
+			} else {
+				Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory().get());
+			}
+			return new JobManagerMessages.TriggerSavepointSuccess(message.jobId(), 0, savepointPathToReturn, 0);
 		}
 	}
 


[5/5] flink git commit: [FLINK-7780] [REST] Define savepoint trigger protocol

Posted by ch...@apache.org.
[FLINK-7780] [REST] Define savepoint trigger protocol

This closes #4789.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7697a325
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7697a325
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7697a325

Branch: refs/heads/master
Commit: 7697a3254d9aa58057c6b10270d20ba142f9d775
Parents: e8e1e33
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 18:09:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:59:09 2017 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 23 +++++-
 .../program/rest/RestClusterClientTest.java     | 75 +++++++++++++++++++
 .../savepoints/SavepointMessageParameters.java  | 46 ++++++++++++
 .../SavepointTargetDirectoryParameter.java      | 41 ++++++++++
 .../job/savepoints/SavepointTriggerHeaders.java | 79 ++++++++++++++++++++
 .../SavepointTriggerResponseBody.java           | 70 +++++++++++++++++
 .../SavepointTriggerResponseBodyTest.java       | 37 +++++++++
 7 files changed, 370 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a37ee63..8ae18af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -40,6 +40,9 @@ import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 
 import javax.annotation.Nullable;
@@ -168,7 +171,25 @@ public class RestClusterClient extends ClusterClient {
 
 	@Override
 	public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
-		throw new UnsupportedOperationException();
+		throw new UnsupportedOperationException("Not implemented yet.");
+	}
+
+	@Override
+	public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+		SavepointTriggerHeaders headers = SavepointTriggerHeaders.getInstance();
+		SavepointMessageParameters params = headers.getUnresolvedMessageParameters();
+		params.jobID.resolve(jobId);
+		if (savepointDirectory != null) {
+			params.targetDirectory.resolve(Collections.singletonList(savepointDirectory));
+		}
+		CompletableFuture<SavepointTriggerResponseBody> responseFuture = restClient.sendRequest(
+			restClusterClientConfiguration.getRestServerAddress(),
+			restClusterClientConfiguration.getRestServerPort(),
+			headers,
+			params
+		);
+		return responseFuture
+			.thenApply(response -> response.location);
 	}
 
 	// ======================================

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 7c71fd0..00c37a6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -47,6 +47,10 @@ import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTargetDirectoryParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.TestLogger;
@@ -60,6 +64,7 @@ import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -182,6 +187,76 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testTriggerSavepoint() throws Exception {
+
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		String targetSavepointDirectory = "/alternate";
+
+		TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory);
+
+		RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+			@Override
+			protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+				Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+				handlers.add(Tuple2.of(triggerHandler.getMessageHeaders(), triggerHandler));
+				return handlers;
+			}
+		};
+
+		RestClusterClient rcc = new RestClusterClient(config);
+		try {
+			rse.start();
+
+			JobID id = new JobID();
+
+			{
+				CompletableFuture<String> savepointPathFuture = rcc.triggerSavepoint(id, null);
+				String savepointPath = savepointPathFuture.get();
+				Assert.assertEquals("/universe", savepointPath);
+			}
+
+			{
+				CompletableFuture<String> savepointPathFuture = rcc.triggerSavepoint(id, targetSavepointDirectory);
+				String savepointPath = savepointPathFuture.get();
+				Assert.assertEquals(targetSavepointDirectory + "/universe", savepointPath);
+			}
+		} finally {
+			rcc.shutdown();
+			rse.shutdown(Time.seconds(5));
+		}
+	}
+
+	private static class TestSavepointTriggerHandler extends TestHandler<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
+
+		private final String expectedSavepointDirectory;
+
+		TestSavepointTriggerHandler(String expectedSavepointDirectory) {
+			super(SavepointTriggerHeaders.getInstance());
+			this.expectedSavepointDirectory = expectedSavepointDirectory;
+		}
+
+		@Override
+		protected CompletableFuture<SavepointTriggerResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			List<String> targetDirectories = request.getQueryParameter(SavepointTargetDirectoryParameter.class);
+			if (targetDirectories.isEmpty()) {
+				return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", "/universe", "big-bang"));
+			} else {
+				String targetDir = targetDirectories.get(0);
+				if (targetDir.equals(expectedSavepointDirectory)) {
+					return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", targetDir + "/universe", "big-bang"));
+				} else {
+					return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", "savepoint directory (" + targetDir + ") did not match expected (" + expectedSavepointDirectory + ')', "big-bang"));
+				}
+			}
+		}
+	}
+
 	private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
 
 		private TestHandler(MessageHeaders<R, P, M> headers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
new file mode 100644
index 0000000..00e52ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * The parameters for triggering a savepoint.
+ */
+public class SavepointMessageParameters extends MessageParameters {
+
+	public JobIDPathParameter jobID = new JobIDPathParameter();
+	public SavepointTargetDirectoryParameter targetDirectory = new SavepointTargetDirectoryParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.singleton(jobID);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singleton(targetDirectory);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
new file mode 100644
index 0000000..76b7dbd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * The parameter denoting the directory where to which a savepoint should be written to.
+ */
+public class SavepointTargetDirectoryParameter extends MessageQueryParameter<String> {
+
+	SavepointTargetDirectoryParameter() {
+		super("targetDirectory", MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public String convertValueFromString(String value) {
+		return value;
+	}
+
+	@Override
+	public String convertStringToValue(String value) {
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
new file mode 100644
index 0000000..62ef4d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for triggering a savepoint.
+ */
+public class SavepointTriggerHeaders implements MessageHeaders<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
+
+	private static final SavepointTriggerHeaders INSTANCE = new SavepointTriggerHeaders();
+
+	private SavepointTriggerHeaders() {
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SavepointTriggerResponseBody> getResponseClass() {
+		return SavepointTriggerResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.ACCEPTED;
+	}
+
+	@Override
+	public SavepointMessageParameters getUnresolvedMessageParameters() {
+		return new SavepointMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.POST;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		/*
+		Note: this is different to the existing implementation for which the targetDirectory is a path parameter
+		Having it as a path parameter has several downsides as it
+			- is optional (which we only allow for query parameters)
+			- causes parsing issues, since the path is not reliably treated as a single parameter
+			- does not denote a hierarchy which path parameters are supposed to do
+			- interacts badly with the POST spec, as it would require the progress url to also contain the targetDirectory
+		 */
+
+		return "/jobs/:jobid/savepoints";
+	}
+
+	public static SavepointTriggerHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
new file mode 100644
index 0000000..2b2dcb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response to the triggering of a savepoint.
+ */
+public class SavepointTriggerResponseBody implements ResponseBody {
+
+	private static final String FIELD_NAME_STATUS = "status";
+
+	private static final String FIELD_NAME_LOCATION = "location";
+
+	private static final String FIELD_NAME_REQUEST_ID = "request-id";
+
+	@JsonProperty(FIELD_NAME_STATUS)
+	public final String status;
+
+	@JsonProperty(FIELD_NAME_LOCATION)
+	public final String location;
+
+	@JsonProperty(FIELD_NAME_REQUEST_ID)
+	public final String requestId;
+
+	@JsonCreator
+	public SavepointTriggerResponseBody(
+			@JsonProperty(FIELD_NAME_STATUS) String status,
+			@JsonProperty(FIELD_NAME_LOCATION) String location,
+			@JsonProperty(FIELD_NAME_REQUEST_ID) String requestId) {
+		this.status = status;
+		this.location = Preconditions.checkNotNull(location);
+		this.requestId = requestId;
+	}
+
+	@Override
+	public int hashCode() {
+		return 79 * location.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof SavepointTriggerResponseBody) {
+			SavepointTriggerResponseBody other = (SavepointTriggerResponseBody) object;
+			return this.location.equals(other.location);
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
new file mode 100644
index 0000000..5f7f918
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for the {@link SavepointTriggerResponseBody}.
+ */
+public class SavepointTriggerResponseBodyTest extends RestResponseMarshallingTestBase<SavepointTriggerResponseBody> {
+
+	@Override
+	protected Class<SavepointTriggerResponseBody> getTestResponseClass() {
+		return SavepointTriggerResponseBody.class;
+	}
+
+	@Override
+	protected SavepointTriggerResponseBody getTestResponseInstance() throws Exception {
+		return new SavepointTriggerResponseBody("growing", "/universe", "big-bang");
+	}
+}


[2/5] flink git commit: [refactor] [tests] Generalize gateway mocking in ClusterClientTest

Posted by ch...@apache.org.
[refactor] [tests] Generalize gateway mocking in ClusterClientTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4430e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4430e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4430e67

Branch: refs/heads/master
Commit: c4430e67feae169f28407be6f292715be018da84
Parents: 593ce53
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:22:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClientTest.java | 71 +++++++++++++-------
 1 file changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4430e67/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 98c7d26..ad34864 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -133,50 +133,38 @@ public class ClusterClientTest extends TestLogger {
 		}
 	}
 
-	private static class TestCancelActorGateway extends DummyActorGateway {
+	private static class TestCancelActorGateway extends TestActorGateway<JobManagerMessages.CancelJob, JobManagerMessages.CancellationSuccess> {
 
 		private final JobID expectedJobID;
-		private volatile boolean messageArrived = false;
 
 		TestCancelActorGateway(JobID expectedJobID) {
+			super(JobManagerMessages.CancelJob.class);
 			this.expectedJobID = expectedJobID;
 		}
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			messageArrived = true;
-			if (message instanceof JobManagerMessages.CancelJob) {
-				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
-				Assert.assertEquals(expectedJobID, cancelJob.jobID());
-				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
-			}
-			Assert.fail("Expected CancelJob message, got: " + message.getClass());
-			return null;
+		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJob message) {
+			Assert.assertEquals(expectedJobID, message.jobID());
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
 		}
 	}
 
-	private static class TestCancelWithSavepointActorGateway extends DummyActorGateway {
+	private static class TestCancelWithSavepointActorGateway extends TestActorGateway<JobManagerMessages.CancelJobWithSavepoint, JobManagerMessages.CancellationSuccess> {
 
 		private final JobID expectedJobID;
 		private final String expectedTargetDirectory;
-		private volatile boolean messageArrived = false;
 
 		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+			super(JobManagerMessages.CancelJobWithSavepoint.class);
 			this.expectedJobID = expectedJobID;
 			this.expectedTargetDirectory = expectedTargetDirectory;
 		}
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			messageArrived = true;
-			if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
-				JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
-				Assert.assertEquals(expectedJobID, cancelJob.jobID());
-				Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory());
-				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
-			}
-			Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass());
-			return null;
+		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
+			Assert.assertEquals(expectedJobID, message.jobID());
+			Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
 		}
 	}
 
@@ -184,7 +172,7 @@ public class ClusterClientTest extends TestLogger {
 
 		private final ActorGateway jobmanagerGateway;
 
-		public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
+		TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
 			super(config);
 			this.jobmanagerGateway = jobmanagerGateway;
 		}
@@ -194,4 +182,39 @@ public class ClusterClientTest extends TestLogger {
 			return jobmanagerGateway;
 		}
 	}
+
+	/**
+	 * Utility class for hiding akka/scala details.
+	 *
+	 * @param <M> expected type of incoming requests
+	 * @param <R> type of outgoing requests
+	 */
+	private abstract static class TestActorGateway<M, R> extends DummyActorGateway {
+		private final Class<M> messageClass;
+		volatile boolean messageArrived = false;
+
+		TestActorGateway(Class<M> messageClass) {
+			this.messageClass = messageClass;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			messageArrived = true;
+			if (message.getClass().isAssignableFrom(messageClass)) {
+				return Future$.MODULE$.successful(process((M) message));
+			}
+			Assert.fail("Expected TriggerSavepoint message, got: " + message.getClass());
+			return null;
+		}
+
+		/**
+		 * Processes the incoming message and verifies it's correctness. Implementations may directly throw unchecked
+		 * exceptions (like JUnit asserts) in case of errors or faulty behaviors.
+		 *
+		 * @param message incoming message
+		 * @return response in case of success
+		 */
+		public abstract R process(M message);
+	}
 }


[3/5] flink git commit: [refactor] [tests] Refactor CliFrontend mocking into utility class

Posted by ch...@apache.org.
[refactor] [tests] Refactor CliFrontend mocking into utility class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/593ce53f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/593ce53f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/593ce53f

Branch: refs/heads/master
Commit: 593ce53f57412b714cd16c5b677dc4726c0b5eff
Parents: 367e430
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 13:06:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200

----------------------------------------------------------------------
 .../flink/client/CliFrontendListCancelTest.java | 20 +-------
 .../flink/client/CliFrontendStopTest.java       | 21 +-------
 .../flink/client/util/MockedCliFrontend.java    | 54 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index e52dde1..b01d162 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.util.MockedCliFrontend;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -36,7 +34,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.testkit.JavaTestKit;
-import org.apache.commons.cli.CommandLine;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,8 +52,6 @@ import static org.mockito.Matchers.isNull;
 import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.times;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the CANCEL and LIST commands.
@@ -220,25 +215,14 @@ public class CliFrontendListCancelTest {
 		}
 	}
 
-	private static final class CancelTestCliFrontend extends CliFrontend {
-		private final ClusterClient client;
+	private static final class CancelTestCliFrontend extends MockedCliFrontend {
 
 		CancelTestCliFrontend(boolean reject) throws Exception {
-			super(CliFrontendTestUtils.getConfigDir());
-			this.client = mock(ClusterClient.class);
 			if (reject) {
 				doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
 				doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
 			}
 		}
-
-		@Override
-		public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
-			CustomCommandLine ccl = mock(CustomCommandLine.class);
-			when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
-				.thenReturn(client);
-			return ccl;
-		}
 	}
 
 	private static final class InfoListTestCliFrontend extends CliFrontend {

http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index ab81713..d10b31c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -20,14 +20,11 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.Flip6DefaultCLI;
 import org.apache.flink.client.cli.StopOptions;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.util.MockedCliFrontend;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.commons.cli.CommandLine;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -36,11 +33,8 @@ import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the STOP command.
@@ -105,23 +99,12 @@ public class CliFrontendStopTest extends TestLogger {
 		}
 	}
 
-	private static final class StopTestCliFrontend extends CliFrontend {
-		private final ClusterClient client;
+	private static final class StopTestCliFrontend extends MockedCliFrontend {
 
 		StopTestCliFrontend(boolean reject) throws Exception {
-			super(CliFrontendTestUtils.getConfigDir());
-			this.client = mock(ClusterClient.class);
 			if (reject) {
 				doThrow(new IllegalArgumentException("Test exception")).when(client).stop(any(JobID.class));
 			}
 		}
-
-		@Override
-		public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
-			CustomCommandLine ccl = mock(CustomCommandLine.class);
-			when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
-				.thenReturn(client);
-			return ccl;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
new file mode 100644
index 0000000..c121c25
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.util;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.CliFrontendTestUtils;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Utility class for mocking the {@link ClusterClient} within a {@link CliFrontend}.
+ *
+ * <p>The mocking behavior can be defined in the constructor of the sub-class.
+ */
+public class MockedCliFrontend extends CliFrontend {
+	public final ClusterClient client;
+
+	protected MockedCliFrontend() throws Exception {
+		super(CliFrontendTestUtils.getConfigDir());
+		this.client = mock(ClusterClient.class);
+	}
+
+	@Override
+	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+		CustomCommandLine ccl = mock(CustomCommandLine.class);
+		when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
+			.thenReturn(client);
+		return ccl;
+	}
+}