You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 11:53:54 UTC
[1/5] flink git commit: [refactor] [tests] Generalize test handler
generation
Repository: flink
Updated Branches:
refs/heads/master 367e430e5 -> 7697a3254
[refactor] [tests] Generalize test handler generation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90eb9028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90eb9028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90eb9028
Branch: refs/heads/master
Commit: 90eb902830749ab30e85c5c3484c0f3e65321e18
Parents: c4430e6
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:23:21 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200
----------------------------------------------------------------------
.../program/rest/RestClusterClientTest.java | 39 +++++++++++---------
1 file changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90eb9028/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 617dd38..7c71fd0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -39,6 +39,10 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
@@ -128,15 +132,11 @@ public class RestClusterClientTest extends TestLogger {
}
}
- private static class TestBlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
+ private static class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
private volatile boolean portRetrieved = false;
private TestBlobServerPortHandler() {
- super(
- CompletableFuture.completedFuture(restAddress),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- BlobServerPortHeaders.getInstance());
+ super(BlobServerPortHeaders.getInstance());
}
@Override
@@ -146,15 +146,11 @@ public class RestClusterClientTest extends TestLogger {
}
}
- private static class TestJobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+ private static class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private volatile boolean jobSubmitted = false;
private TestJobSubmitHandler() {
- super(
- CompletableFuture.completedFuture(restAddress),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- JobSubmitHeaders.getInstance());
+ super(JobSubmitHeaders.getInstance());
}
@Override
@@ -164,16 +160,12 @@ public class RestClusterClientTest extends TestLogger {
}
}
- private static class TestJobTerminationHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
+ private static class TestJobTerminationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
private volatile boolean jobCanceled = false;
private volatile boolean jobStopped = false;
private TestJobTerminationHandler() {
- super(
- CompletableFuture.completedFuture(restAddress),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- JobTerminationHeaders.getInstance());
+ super(JobTerminationHeaders.getInstance());
}
@Override
@@ -189,4 +181,15 @@ public class RestClusterClientTest extends TestLogger {
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
}
+
+ private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
+
+ private TestHandler(MessageHeaders<R, P, M> headers) {
+ super(
+ CompletableFuture.completedFuture(restAddress),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT,
+ headers);
+ }
+ }
}
[4/5] flink git commit: [FLINK-7780] [Client] Move savepoint logic
into ClusterClient
Posted by ch...@apache.org.
[FLINK-7780] [Client] Move savepoint logic into ClusterClient
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e1e330
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e1e330
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e1e330
Branch: refs/heads/master
Commit: e8e1e330a62bcdad939c896ab807362cc346278b
Parents: 90eb902
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 13:34:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:58:54 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 49 ++++----
.../flink/client/program/ClusterClient.java | 34 +++++
.../flink/client/CliFrontendSavepointTest.java | 126 ++++++-------------
.../flink/client/program/ClusterClientTest.java | 58 ++++++++-
4 files changed, 144 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9be8295..c065453 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -61,11 +61,11 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -89,16 +89,15 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
-import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
/**
* Implementation of a simple command line frontend for executing programs.
@@ -726,35 +725,29 @@ public class CliFrontend {
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
try {
- ActorGateway jobManager = getJobManagerGateway(options);
-
- logAndSysout("Triggering savepoint for job " + jobId + ".");
- Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
- new FiniteDuration(1, TimeUnit.HOURS));
-
- Object result;
+ CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+ ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
try {
- logAndSysout("Waiting for response...");
- result = Await.result(response, FiniteDuration.Inf());
- }
- catch (Exception e) {
- throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
- }
+ logAndSysout("Triggering savepoint for job " + jobId + ".");
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
- if (result instanceof TriggerSavepointSuccess) {
- TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
- logAndSysout("Savepoint completed. Path: " + success.savepointPath());
+ String savepointPath;
+ try {
+ logAndSysout("Waiting for response...");
+ savepointPath = savepointPathFuture.get();
+ }
+ catch (ExecutionException ee) {
+ Throwable cause = ExceptionUtils.stripExecutionException(ee);
+ throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
+ }
+
+ logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");
return 0;
}
- else if (result instanceof TriggerSavepointFailure) {
- TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
- throw failure.cause();
- }
- else {
- throw new IllegalStateException("Unknown JobManager response of type " +
- result.getClass());
+ finally {
+ client.shutdown();
}
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 78455c1..eb89f09 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -72,6 +73,9 @@ import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.Tuple2;
@@ -650,6 +654,36 @@ public abstract class ClusterClient {
}
/**
+ * Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
+ * directory, or {@link org.apache.flink.configuration.CoreOptions#SAVEPOINT_DIRECTORY} if it is null.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @return path future where the savepoint is located
+ * @throws Exception if no connection to the cluster could be established
+ */
+ public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+ final ActorGateway jobManager = getJobManagerGateway();
+
+ Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
+ new FiniteDuration(1, TimeUnit.HOURS));
+ CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
+
+ return responseFuture.thenApply((responseMessage) -> {
+ if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
+ JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage;
+ return success.savepointPath();
+ } else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
+ JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage;
+ throw new CompletionException(failure.cause());
+ } else {
+ throw new CompletionException(
+ new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+ }
+ });
+ }
+
+ /**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
* to deserialize the incoming accumulator results.
http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index cfed859..1f0d356 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.client;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -33,22 +35,23 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.util.concurrent.CompletableFuture;
import java.util.zip.ZipOutputStream;
-import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -76,31 +79,19 @@ public class CliFrontendSavepointTest {
try {
JobID jobId = new JobID();
- ActorGateway jobManager = mock(ActorGateway.class);
-
- Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- when(jobManager.ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class)))
- .thenReturn(triggerResponse.future());
String savepointPath = "expectedSavepointPath";
- triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
-
- CliFrontend frontend = new MockCliFrontend(
- CliFrontendTestUtils.getConfigDir(), jobManager);
+ MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);
String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);
assertEquals(0, returnCode);
- verify(jobManager, times(1)).ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class));
+ verify(frontend.client, times(1))
+ .triggerSavepoint(eq(jobId), isNull(String.class));
- assertTrue(buffer.toString().contains("expectedSavepointPath"));
+ assertTrue(buffer.toString().contains(savepointPath));
}
finally {
restoreStdOutAndStdErr();
@@ -113,29 +104,17 @@ public class CliFrontendSavepointTest {
try {
JobID jobId = new JobID();
- ActorGateway jobManager = mock(ActorGateway.class);
-
- Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- when(jobManager.ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class)))
- .thenReturn(triggerResponse.future());
Exception testException = new Exception("expectedTestException");
- triggerResponse.success(new TriggerSavepointFailure(jobId, testException));
-
- CliFrontend frontend = new MockCliFrontend(
- CliFrontendTestUtils.getConfigDir(), jobManager);
+ MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);
String[] parameters = { jobId.toString() };
int returnCode = frontend.savepoint(parameters);
- assertTrue(returnCode != 0);
- verify(jobManager, times(1)).ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class));
+ assertNotEquals(0, returnCode);
+ verify(frontend.client, times(1))
+ .triggerSavepoint(eq(jobId), isNull(String.class));
assertTrue(buffer.toString().contains("expectedTestException"));
}
@@ -162,46 +141,9 @@ public class CliFrontendSavepointTest {
}
}
- @Test
- public void testTriggerSavepointFailureUnknownResponse() throws Exception {
- replaceStdOutAndStdErr();
-
- try {
- JobID jobId = new JobID();
- ActorGateway jobManager = mock(ActorGateway.class);
-
- Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- when(jobManager.ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class)))
- .thenReturn(triggerResponse.future());
-
- triggerResponse.success("UNKNOWN RESPONSE");
-
- CliFrontend frontend = new MockCliFrontend(
- CliFrontendTestUtils.getConfigDir(), jobManager);
-
- String[] parameters = { jobId.toString() };
- int returnCode = frontend.savepoint(parameters);
-
- assertTrue(returnCode != 0);
- verify(jobManager, times(1)).ask(
- Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())),
- any(FiniteDuration.class));
-
- String errMsg = buffer.toString();
- assertTrue(errMsg.contains("IllegalStateException"));
- assertTrue(errMsg.contains("Unknown JobManager response"));
- }
- finally {
- restoreStdOutAndStdErr();
- }
- }
-
/**
* Tests that a CLI call with a custom savepoint directory target is
- * forwarded correctly to the JM.
+ * forwarded correctly to the cluster client.
*/
@Test
public void testTriggerSavepointCustomTarget() throws Exception {
@@ -209,30 +151,19 @@ public class CliFrontendSavepointTest {
try {
JobID jobId = new JobID();
- Option<String> customTarget = Option.apply("customTargetDirectory");
- ActorGateway jobManager = mock(ActorGateway.class);
- Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ String savepointDirectory = "customTargetDirectory";
- when(jobManager.ask(
- Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
- any(FiniteDuration.class)))
- .thenReturn(triggerResponse.future());
- String savepointPath = "expectedSavepointPath";
- triggerResponse.success(new TriggerSavepointSuccess(jobId, -1, savepointPath, -1));
+ MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);
- CliFrontend frontend = new MockCliFrontend(
- CliFrontendTestUtils.getConfigDir(), jobManager);
-
- String[] parameters = { jobId.toString(), customTarget.get() };
+ String[] parameters = { jobId.toString(), savepointDirectory };
int returnCode = frontend.savepoint(parameters);
assertEquals(0, returnCode);
- verify(jobManager, times(1)).ask(
- Mockito.eq(new TriggerSavepoint(jobId, customTarget)),
- any(FiniteDuration.class));
+ verify(frontend.client, times(1))
+ .triggerSavepoint(eq(jobId), eq(savepointDirectory));
- assertTrue(buffer.toString().contains("expectedSavepointPath"));
+ assertTrue(buffer.toString().contains(savepointDirectory));
}
finally {
restoreStdOutAndStdErr();
@@ -444,4 +375,17 @@ public class CliFrontendSavepointTest {
System.setOut(stdOut);
System.setErr(stdErr);
}
+
+ private static final class SavepointTestCliFrontend extends MockedCliFrontend {
+
+ SavepointTestCliFrontend(String expectedResponse) throws Exception {
+ when(client.triggerSavepoint(any(JobID.class), anyString()))
+ .thenReturn(CompletableFuture.completedFuture(expectedResponse));
+ }
+
+ SavepointTestCliFrontend(Exception expectedException) throws Exception {
+ when(client.triggerSavepoint(any(JobID.class), anyString()))
+ .thenReturn(FutureUtils.completedExceptionally(expectedException));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e8e1e330/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index ad34864..5f6d9fe 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.CompletableFuture;
+
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
@@ -100,12 +102,33 @@ public class ClusterClientTest extends TestLogger {
config.setString(JobManagerOptions.ADDRESS, "localhost");
JobID jobID = new JobID();
+ String savepointDirectory = "/test/directory";
+ String savepointPath = "/test/path";
+ TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointDirectory, savepointPath);
+ ClusterClient clusterClient = new TestClusterClient(config, gateway);
+ try {
+ String path = clusterClient.cancelWithSavepoint(jobID, savepointDirectory);
+ Assert.assertTrue(gateway.messageArrived);
+ Assert.assertEquals(savepointPath, path);
+ } finally {
+ clusterClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testClusterClientSavepoint() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+ JobID jobID = new JobID();
+ String savepointDirectory = "/test/directory";
String savepointPath = "/test/path";
- TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath);
+ TestSavepointActorGateway gateway = new TestSavepointActorGateway(jobID, savepointDirectory, savepointPath);
ClusterClient clusterClient = new TestClusterClient(config, gateway);
try {
- clusterClient.cancelWithSavepoint(jobID, savepointPath);
+ CompletableFuture<String> pathFuture = clusterClient.triggerSavepoint(jobID, savepointDirectory);
Assert.assertTrue(gateway.messageArrived);
+ Assert.assertEquals(savepointPath, pathFuture.get());
} finally {
clusterClient.shutdown();
}
@@ -153,18 +176,45 @@ public class ClusterClientTest extends TestLogger {
private final JobID expectedJobID;
private final String expectedTargetDirectory;
+ private final String savepointPathToReturn;
- TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+ TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
super(JobManagerMessages.CancelJobWithSavepoint.class);
this.expectedJobID = expectedJobID;
this.expectedTargetDirectory = expectedTargetDirectory;
+ this.savepointPathToReturn = savepointPathToReturn;
}
@Override
public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
Assert.assertEquals(expectedJobID, message.jobID());
Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
- return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
+ return new JobManagerMessages.CancellationSuccess(message.jobID(), savepointPathToReturn);
+ }
+ }
+
+ private static class TestSavepointActorGateway extends TestActorGateway<JobManagerMessages.TriggerSavepoint, JobManagerMessages.TriggerSavepointSuccess> {
+
+ private final JobID expectedJobID;
+ private final String expectedTargetDirectory;
+ private final String savepointPathToReturn;
+
+ private TestSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory, String savepointPathToReturn) {
+ super(JobManagerMessages.TriggerSavepoint.class);
+ this.expectedJobID = expectedJobID;
+ this.expectedTargetDirectory = expectedTargetDirectory;
+ this.savepointPathToReturn = savepointPathToReturn;
+ }
+
+ @Override
+ public JobManagerMessages.TriggerSavepointSuccess process(JobManagerMessages.TriggerSavepoint message) {
+ Assert.assertEquals(expectedJobID, message.jobId());
+ if (expectedTargetDirectory == null) {
+ Assert.assertTrue(message.savepointDirectory().isEmpty());
+ } else {
+ Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory().get());
+ }
+ return new JobManagerMessages.TriggerSavepointSuccess(message.jobId(), 0, savepointPathToReturn, 0);
}
}
[5/5] flink git commit: [FLINK-7780] [REST] Define savepoint trigger
protocol
Posted by ch...@apache.org.
[FLINK-7780] [REST] Define savepoint trigger protocol
This closes #4789.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7697a325
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7697a325
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7697a325
Branch: refs/heads/master
Commit: 7697a3254d9aa58057c6b10270d20ba142f9d775
Parents: e8e1e33
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 18:09:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:59:09 2017 +0200
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 23 +++++-
.../program/rest/RestClusterClientTest.java | 75 +++++++++++++++++++
.../savepoints/SavepointMessageParameters.java | 46 ++++++++++++
.../SavepointTargetDirectoryParameter.java | 41 ++++++++++
.../job/savepoints/SavepointTriggerHeaders.java | 79 ++++++++++++++++++++
.../SavepointTriggerResponseBody.java | 70 +++++++++++++++++
.../SavepointTriggerResponseBodyTest.java | 37 +++++++++
7 files changed, 370 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a37ee63..8ae18af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -40,6 +40,9 @@ import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import javax.annotation.Nullable;
@@ -168,7 +171,25 @@ public class RestClusterClient extends ClusterClient {
@Override
public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+ SavepointTriggerHeaders headers = SavepointTriggerHeaders.getInstance();
+ SavepointMessageParameters params = headers.getUnresolvedMessageParameters();
+ params.jobID.resolve(jobId);
+ if (savepointDirectory != null) {
+ params.targetDirectory.resolve(Collections.singletonList(savepointDirectory));
+ }
+ CompletableFuture<SavepointTriggerResponseBody> responseFuture = restClient.sendRequest(
+ restClusterClientConfiguration.getRestServerAddress(),
+ restClusterClientConfiguration.getRestServerPort(),
+ headers,
+ params
+ );
+ return responseFuture
+ .thenApply(response -> response.location);
}
// ======================================
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 7c71fd0..00c37a6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -47,6 +47,10 @@ import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTargetDirectoryParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.TestLogger;
@@ -60,6 +64,7 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -182,6 +187,76 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ @Test
+ public void testTriggerSavepoint() throws Exception {
+
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+ RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+ String targetSavepointDirectory = "/alternate";
+
+ TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory);
+
+ RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+ @Override
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+ Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+ handlers.add(Tuple2.of(triggerHandler.getMessageHeaders(), triggerHandler));
+ return handlers;
+ }
+ };
+
+ RestClusterClient rcc = new RestClusterClient(config);
+ try {
+ rse.start();
+
+ JobID id = new JobID();
+
+ {
+ CompletableFuture<String> savepointPathFuture = rcc.triggerSavepoint(id, null);
+ String savepointPath = savepointPathFuture.get();
+ Assert.assertEquals("/universe", savepointPath);
+ }
+
+ {
+ CompletableFuture<String> savepointPathFuture = rcc.triggerSavepoint(id, targetSavepointDirectory);
+ String savepointPath = savepointPathFuture.get();
+ Assert.assertEquals(targetSavepointDirectory + "/universe", savepointPath);
+ }
+ } finally {
+ rcc.shutdown();
+ rse.shutdown(Time.seconds(5));
+ }
+ }
+
+ private static class TestSavepointTriggerHandler extends TestHandler<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
+
+ private final String expectedSavepointDirectory;
+
+ TestSavepointTriggerHandler(String expectedSavepointDirectory) {
+ super(SavepointTriggerHeaders.getInstance());
+ this.expectedSavepointDirectory = expectedSavepointDirectory;
+ }
+
+ @Override
+ protected CompletableFuture<SavepointTriggerResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ List<String> targetDirectories = request.getQueryParameter(SavepointTargetDirectoryParameter.class);
+ if (targetDirectories.isEmpty()) {
+ return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", "/universe", "big-bang"));
+ } else {
+ String targetDir = targetDirectories.get(0);
+ if (targetDir.equals(expectedSavepointDirectory)) {
+ return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", targetDir + "/universe", "big-bang"));
+ } else {
+ return CompletableFuture.completedFuture(new SavepointTriggerResponseBody("growing", "savepoint directory (" + targetDir + ") did not match expected (" + expectedSavepointDirectory + ')', "big-bang"));
+ }
+ }
+ }
+ }
+
private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
private TestHandler(MessageHeaders<R, P, M> headers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
new file mode 100644
index 0000000..00e52ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * The parameters for triggering a savepoint.
+ */
+public class SavepointMessageParameters extends MessageParameters {
+
+ public JobIDPathParameter jobID = new JobIDPathParameter();
+ public SavepointTargetDirectoryParameter targetDirectory = new SavepointTargetDirectoryParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.singleton(jobID);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.singleton(targetDirectory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
new file mode 100644
index 0000000..76b7dbd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTargetDirectoryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * The parameter denoting the directory where to which a savepoint should be written to.
+ */
+public class SavepointTargetDirectoryParameter extends MessageQueryParameter<String> {
+
+ SavepointTargetDirectoryParameter() {
+ super("targetDirectory", MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public String convertValueFromString(String value) {
+ return value;
+ }
+
+ @Override
+ public String convertStringToValue(String value) {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
new file mode 100644
index 0000000..62ef4d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for triggering a savepoint.
+ */
+public class SavepointTriggerHeaders implements MessageHeaders<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
+
+ private static final SavepointTriggerHeaders INSTANCE = new SavepointTriggerHeaders();
+
+ private SavepointTriggerHeaders() {
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SavepointTriggerResponseBody> getResponseClass() {
+ return SavepointTriggerResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.ACCEPTED;
+ }
+
+ @Override
+ public SavepointMessageParameters getUnresolvedMessageParameters() {
+ return new SavepointMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ /*
+ Note: this is different to the existing implementation for which the targetDirectory is a path parameter
+ Having it as a path parameter has several downsides as it
+ - is optional (which we only allow for query parameters)
+ - causes parsing issues, since the path is not reliably treated as a single parameter
+ - does not denote a hierarchy which path parameters are supposed to do
+ - interacts badly with the POST spec, as it would require the progress url to also contain the targetDirectory
+ */
+
+ return "/jobs/:jobid/savepoints";
+ }
+
+ public static SavepointTriggerHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
new file mode 100644
index 0000000..2b2dcb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBody.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response to the triggering of a savepoint.
+ */
+public class SavepointTriggerResponseBody implements ResponseBody {
+
+ private static final String FIELD_NAME_STATUS = "status";
+
+ private static final String FIELD_NAME_LOCATION = "location";
+
+ private static final String FIELD_NAME_REQUEST_ID = "request-id";
+
+ @JsonProperty(FIELD_NAME_STATUS)
+ public final String status;
+
+ @JsonProperty(FIELD_NAME_LOCATION)
+ public final String location;
+
+ @JsonProperty(FIELD_NAME_REQUEST_ID)
+ public final String requestId;
+
+ @JsonCreator
+ public SavepointTriggerResponseBody(
+ @JsonProperty(FIELD_NAME_STATUS) String status,
+ @JsonProperty(FIELD_NAME_LOCATION) String location,
+ @JsonProperty(FIELD_NAME_REQUEST_ID) String requestId) {
+ this.status = status;
+ this.location = Preconditions.checkNotNull(location);
+ this.requestId = requestId;
+ }
+
+ @Override
+ public int hashCode() {
+ return 79 * location.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof SavepointTriggerResponseBody) {
+ SavepointTriggerResponseBody other = (SavepointTriggerResponseBody) object;
+ return this.location.equals(other.location);
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7697a325/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
new file mode 100644
index 0000000..5f7f918
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for the {@link SavepointTriggerResponseBody}.
+ */
+public class SavepointTriggerResponseBodyTest extends RestResponseMarshallingTestBase<SavepointTriggerResponseBody> {
+
+ @Override
+ protected Class<SavepointTriggerResponseBody> getTestResponseClass() {
+ return SavepointTriggerResponseBody.class;
+ }
+
+ @Override
+ protected SavepointTriggerResponseBody getTestResponseInstance() throws Exception {
+ return new SavepointTriggerResponseBody("growing", "/universe", "big-bang");
+ }
+}
[2/5] flink git commit: [refactor] [tests] Generalize gateway mocking
in ClusterClientTest
Posted by ch...@apache.org.
[refactor] [tests] Generalize gateway mocking in ClusterClientTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4430e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4430e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4430e67
Branch: refs/heads/master
Commit: c4430e67feae169f28407be6f292715be018da84
Parents: 593ce53
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:22:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClientTest.java | 71 +++++++++++++-------
1 file changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4430e67/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 98c7d26..ad34864 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -133,50 +133,38 @@ public class ClusterClientTest extends TestLogger {
}
}
- private static class TestCancelActorGateway extends DummyActorGateway {
+ private static class TestCancelActorGateway extends TestActorGateway<JobManagerMessages.CancelJob, JobManagerMessages.CancellationSuccess> {
private final JobID expectedJobID;
- private volatile boolean messageArrived = false;
TestCancelActorGateway(JobID expectedJobID) {
+ super(JobManagerMessages.CancelJob.class);
this.expectedJobID = expectedJobID;
}
@Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- messageArrived = true;
- if (message instanceof JobManagerMessages.CancelJob) {
- JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
- Assert.assertEquals(expectedJobID, cancelJob.jobID());
- return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
- }
- Assert.fail("Expected CancelJob message, got: " + message.getClass());
- return null;
+ public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJob message) {
+ Assert.assertEquals(expectedJobID, message.jobID());
+ return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
}
}
- private static class TestCancelWithSavepointActorGateway extends DummyActorGateway {
+ private static class TestCancelWithSavepointActorGateway extends TestActorGateway<JobManagerMessages.CancelJobWithSavepoint, JobManagerMessages.CancellationSuccess> {
private final JobID expectedJobID;
private final String expectedTargetDirectory;
- private volatile boolean messageArrived = false;
TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+ super(JobManagerMessages.CancelJobWithSavepoint.class);
this.expectedJobID = expectedJobID;
this.expectedTargetDirectory = expectedTargetDirectory;
}
@Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- messageArrived = true;
- if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
- JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
- Assert.assertEquals(expectedJobID, cancelJob.jobID());
- Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory());
- return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
- }
- Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass());
- return null;
+ public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
+ Assert.assertEquals(expectedJobID, message.jobID());
+ Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
+ return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
}
}
@@ -184,7 +172,7 @@ public class ClusterClientTest extends TestLogger {
private final ActorGateway jobmanagerGateway;
- public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
+ TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
super(config);
this.jobmanagerGateway = jobmanagerGateway;
}
@@ -194,4 +182,39 @@ public class ClusterClientTest extends TestLogger {
return jobmanagerGateway;
}
}
+
+ /**
+ * Utility class for hiding akka/scala details.
+ *
+ * @param <M> expected type of incoming requests
+ * @param <R> type of outgoing requests
+ */
+ private abstract static class TestActorGateway<M, R> extends DummyActorGateway {
+ private final Class<M> messageClass;
+ volatile boolean messageArrived = false;
+
+ TestActorGateway(Class<M> messageClass) {
+ this.messageClass = messageClass;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ messageArrived = true;
+ if (message.getClass().isAssignableFrom(messageClass)) {
+ return Future$.MODULE$.successful(process((M) message));
+ }
+ Assert.fail("Expected TriggerSavepoint message, got: " + message.getClass());
+ return null;
+ }
+
+ /**
+ * Processes the incoming message and verifies it's correctness. Implementations may directly throw unchecked
+ * exceptions (like JUnit asserts) in case of errors or faulty behaviors.
+ *
+ * @param message incoming message
+ * @return response in case of success
+ */
+ public abstract R process(M message);
+ }
}
[3/5] flink git commit: [refactor] [tests] Refactor CliFrontend
mocking into utility class
Posted by ch...@apache.org.
[refactor] [tests] Refactor CliFrontend mocking into utility class
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/593ce53f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/593ce53f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/593ce53f
Branch: refs/heads/master
Commit: 593ce53f57412b714cd16c5b677dc4726c0b5eff
Parents: 367e430
Author: zentol <ch...@apache.org>
Authored: Mon Oct 9 13:06:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200
----------------------------------------------------------------------
.../flink/client/CliFrontendListCancelTest.java | 20 +-------
.../flink/client/CliFrontendStopTest.java | 21 +-------
.../flink/client/util/MockedCliFrontend.java | 54 ++++++++++++++++++++
3 files changed, 58 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index e52dde1..b01d162 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.util.MockedCliFrontend;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -36,7 +34,6 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.testkit.JavaTestKit;
-import org.apache.commons.cli.CommandLine;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -55,8 +52,6 @@ import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests for the CANCEL and LIST commands.
@@ -220,25 +215,14 @@ public class CliFrontendListCancelTest {
}
}
- private static final class CancelTestCliFrontend extends CliFrontend {
- private final ClusterClient client;
+ private static final class CancelTestCliFrontend extends MockedCliFrontend {
CancelTestCliFrontend(boolean reject) throws Exception {
- super(CliFrontendTestUtils.getConfigDir());
- this.client = mock(ClusterClient.class);
if (reject) {
doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
}
}
-
- @Override
- public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
- CustomCommandLine ccl = mock(CustomCommandLine.class);
- when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
- .thenReturn(client);
- return ccl;
- }
}
private static final class InfoListTestCliFrontend extends CliFrontend {
http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index ab81713..d10b31c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -20,14 +20,11 @@ package org.apache.flink.client;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.Flip6DefaultCLI;
import org.apache.flink.client.cli.StopOptions;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.util.MockedCliFrontend;
import org.apache.flink.util.TestLogger;
-import org.apache.commons.cli.CommandLine;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -36,11 +33,8 @@ import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests for the STOP command.
@@ -105,23 +99,12 @@ public class CliFrontendStopTest extends TestLogger {
}
}
- private static final class StopTestCliFrontend extends CliFrontend {
- private final ClusterClient client;
+ private static final class StopTestCliFrontend extends MockedCliFrontend {
StopTestCliFrontend(boolean reject) throws Exception {
- super(CliFrontendTestUtils.getConfigDir());
- this.client = mock(ClusterClient.class);
if (reject) {
doThrow(new IllegalArgumentException("Test exception")).when(client).stop(any(JobID.class));
}
}
-
- @Override
- public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
- CustomCommandLine ccl = mock(CustomCommandLine.class);
- when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
- .thenReturn(client);
- return ccl;
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/593ce53f/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
new file mode 100644
index 0000000..c121c25
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.util;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.CliFrontendTestUtils;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Utility class for mocking the {@link ClusterClient} within a {@link CliFrontend}.
+ *
+ * <p>The mocking behavior can be defined in the constructor of the sub-class.
+ */
+public class MockedCliFrontend extends CliFrontend {
+ public final ClusterClient client;
+
+ protected MockedCliFrontend() throws Exception {
+ super(CliFrontendTestUtils.getConfigDir());
+ this.client = mock(ClusterClient.class);
+ }
+
+ @Override
+ public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+ CustomCommandLine ccl = mock(CustomCommandLine.class);
+ when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
+ .thenReturn(client);
+ return ccl;
+ }
+}