You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 11:53:55 UTC
[2/5] flink git commit: [refactor] [tests] Generalize gateway mocking
in ClusterClientTest
[refactor] [tests] Generalize gateway mocking in ClusterClientTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4430e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4430e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4430e67
Branch: refs/heads/master
Commit: c4430e67feae169f28407be6f292715be018da84
Parents: 593ce53
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:22:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClientTest.java | 71 +++++++++++++-------
1 file changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4430e67/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 98c7d26..ad34864 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -133,50 +133,38 @@ public class ClusterClientTest extends TestLogger {
}
}
- private static class TestCancelActorGateway extends DummyActorGateway {
+ private static class TestCancelActorGateway extends TestActorGateway<JobManagerMessages.CancelJob, JobManagerMessages.CancellationSuccess> {
private final JobID expectedJobID;
- private volatile boolean messageArrived = false;
TestCancelActorGateway(JobID expectedJobID) {
+ super(JobManagerMessages.CancelJob.class);
this.expectedJobID = expectedJobID;
}
@Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- messageArrived = true;
- if (message instanceof JobManagerMessages.CancelJob) {
- JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
- Assert.assertEquals(expectedJobID, cancelJob.jobID());
- return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
- }
- Assert.fail("Expected CancelJob message, got: " + message.getClass());
- return null;
+ public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJob message) {
+ Assert.assertEquals(expectedJobID, message.jobID());
+ return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
}
}
- private static class TestCancelWithSavepointActorGateway extends DummyActorGateway {
+ private static class TestCancelWithSavepointActorGateway extends TestActorGateway<JobManagerMessages.CancelJobWithSavepoint, JobManagerMessages.CancellationSuccess> {
private final JobID expectedJobID;
private final String expectedTargetDirectory;
- private volatile boolean messageArrived = false;
TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+ super(JobManagerMessages.CancelJobWithSavepoint.class);
this.expectedJobID = expectedJobID;
this.expectedTargetDirectory = expectedTargetDirectory;
}
@Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- messageArrived = true;
- if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
- JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
- Assert.assertEquals(expectedJobID, cancelJob.jobID());
- Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory());
- return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
- }
- Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass());
- return null;
+ public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
+ Assert.assertEquals(expectedJobID, message.jobID());
+ Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
+ return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
}
}
@@ -184,7 +172,7 @@ public class ClusterClientTest extends TestLogger {
private final ActorGateway jobmanagerGateway;
- public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
+ TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
super(config);
this.jobmanagerGateway = jobmanagerGateway;
}
@@ -194,4 +182,39 @@ public class ClusterClientTest extends TestLogger {
return jobmanagerGateway;
}
}
+
+ /**
+ * Utility class for hiding akka/scala details.
+ *
+ * @param <M> expected type of incoming requests
+ * @param <R> type of outgoing requests
+ */
+ private abstract static class TestActorGateway<M, R> extends DummyActorGateway {
+ private final Class<M> messageClass;
+ volatile boolean messageArrived = false;
+
+ TestActorGateway(Class<M> messageClass) {
+ this.messageClass = messageClass;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Future<Object> ask(Object message, FiniteDuration timeout) {
+ messageArrived = true;
+ if (message.getClass().isAssignableFrom(messageClass)) {
+ return Future$.MODULE$.successful(process((M) message));
+ }
+ Assert.fail("Expected TriggerSavepoint message, got: " + message.getClass());
+ return null;
+ }
+
+ /**
+ * Processes the incoming message and verifies it's correctness. Implementations may directly throw unchecked
+ * exceptions (like JUnit asserts) in case of errors or faulty behaviors.
+ *
+ * @param message incoming message
+ * @return response in case of success
+ */
+ public abstract R process(M message);
+ }
}