You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 11:53:55 UTC

[2/5] flink git commit: [refactor] [tests] Generalize gateway mocking in ClusterClientTest

[refactor] [tests] Generalize gateway mocking in ClusterClientTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4430e67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4430e67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4430e67

Branch: refs/heads/master
Commit: c4430e67feae169f28407be6f292715be018da84
Parents: 593ce53
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 13:22:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 11:56:59 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClientTest.java | 71 +++++++++++++-------
 1 file changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4430e67/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 98c7d26..ad34864 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -133,50 +133,38 @@ public class ClusterClientTest extends TestLogger {
 		}
 	}
 
-	private static class TestCancelActorGateway extends DummyActorGateway {
+	private static class TestCancelActorGateway extends TestActorGateway<JobManagerMessages.CancelJob, JobManagerMessages.CancellationSuccess> {
 
 		private final JobID expectedJobID;
-		private volatile boolean messageArrived = false;
 
 		TestCancelActorGateway(JobID expectedJobID) {
+			super(JobManagerMessages.CancelJob.class);
 			this.expectedJobID = expectedJobID;
 		}
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			messageArrived = true;
-			if (message instanceof JobManagerMessages.CancelJob) {
-				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
-				Assert.assertEquals(expectedJobID, cancelJob.jobID());
-				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
-			}
-			Assert.fail("Expected CancelJob message, got: " + message.getClass());
-			return null;
+		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJob message) {
+			Assert.assertEquals(expectedJobID, message.jobID());
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
 		}
 	}
 
-	private static class TestCancelWithSavepointActorGateway extends DummyActorGateway {
+	private static class TestCancelWithSavepointActorGateway extends TestActorGateway<JobManagerMessages.CancelJobWithSavepoint, JobManagerMessages.CancellationSuccess> {
 
 		private final JobID expectedJobID;
 		private final String expectedTargetDirectory;
-		private volatile boolean messageArrived = false;
 
 		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+			super(JobManagerMessages.CancelJobWithSavepoint.class);
 			this.expectedJobID = expectedJobID;
 			this.expectedTargetDirectory = expectedTargetDirectory;
 		}
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			messageArrived = true;
-			if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
-				JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
-				Assert.assertEquals(expectedJobID, cancelJob.jobID());
-				Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory());
-				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
-			}
-			Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass());
-			return null;
+		public JobManagerMessages.CancellationSuccess process(JobManagerMessages.CancelJobWithSavepoint message) {
+			Assert.assertEquals(expectedJobID, message.jobID());
+			Assert.assertEquals(expectedTargetDirectory, message.savepointDirectory());
+			return new JobManagerMessages.CancellationSuccess(message.jobID(), null);
 		}
 	}
 
@@ -184,7 +172,7 @@ public class ClusterClientTest extends TestLogger {
 
 		private final ActorGateway jobmanagerGateway;
 
-		public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
+		TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
 			super(config);
 			this.jobmanagerGateway = jobmanagerGateway;
 		}
@@ -194,4 +182,39 @@ public class ClusterClientTest extends TestLogger {
 			return jobmanagerGateway;
 		}
 	}
+
+	/**
+	 * Utility class for hiding akka/scala details.
+	 *
+	 * @param <M> expected type of incoming requests
+	 * @param <R> type of outgoing requests
+	 */
+	private abstract static class TestActorGateway<M, R> extends DummyActorGateway {
+		private final Class<M> messageClass;
+		volatile boolean messageArrived = false;
+
+		TestActorGateway(Class<M> messageClass) {
+			this.messageClass = messageClass;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			messageArrived = true;
+			if (message.getClass().isAssignableFrom(messageClass)) {
+				return Future$.MODULE$.successful(process((M) message));
+			}
+			Assert.fail("Expected TriggerSavepoint message, got: " + message.getClass());
+			return null;
+		}
+
+		/**
+		 * Processes the incoming message and verifies it's correctness. Implementations may directly throw unchecked
+		 * exceptions (like JUnit asserts) in case of errors or faulty behaviors.
+		 *
+		 * @param message incoming message
+		 * @return response in case of success
+		 */
+		public abstract R process(M message);
+	}
 }