You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 19:51:21 UTC

[flink] 01/02: [hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89ddffee2ab396b0beaadc694ab1f765852c8f64
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 11:42:11 2018 +0200

    [hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  2 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  | 33 +---------------------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  2 +-
 .../dispatcher/TestingJobManagerRunnerFactory.java |  8 ++++--
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 10 +++----
 5 files changed, 13 insertions(+), 42 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 335199a..c825451 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -163,7 +163,7 @@ public class DispatcherHATest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
 			testingFatalErrorHandler,
 			fencingTokens);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d09ab8d..5c4ac34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,26 +29,19 @@ import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TestingBlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -79,8 +72,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests the resource cleanup by the {@link Dispatcher}.
@@ -188,7 +179,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture),
 			fatalErrorHandler);
 
 		dispatcher.start();
@@ -447,28 +438,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
 	}
 
-	private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
-		private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
-		private final CompletableFuture<Void> terminationFuture;
-
-		private TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
-			this.resultFuture = resultFuture;
-			this.terminationFuture = terminationFuture;
-		}
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
-			final JobManagerRunner jobManagerRunnerMock = mock(JobManagerRunner.class);
-
-			when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
-			when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
-
-			return jobManagerRunnerMock;
-		}
-	}
-
 	private static final class TestingBlobServer extends BlobServer {
 
 		private final CompletableFuture<JobID> cleanupJobFuture;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 39c06f3..eed23ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -125,7 +125,7 @@ public class MiniDispatcherTest extends TestLogger {
 		jobGraphFuture = new CompletableFuture<>();
 		resultFuture = new CompletableFuture<>();
 
-		testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
+		testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null));
 	}
 
 	@After
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index f9be888..cb48648 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -44,10 +44,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+	private final CompletableFuture<Void> terminationFuture;
 
-	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
 		this.jobGraphFuture = jobGraphFuture;
 		this.resultFuture = resultFuture;
+		this.terminationFuture = terminationFuture;
 	}
 
 	@Override
@@ -61,12 +63,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler) {
 		jobGraphFuture.complete(jobGraph);
 
 		final JobManagerRunner mock = mock(JobManagerRunner.class);
 		when(mock.getResultFuture()).thenReturn(resultFuture);
-		when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+		when(mock.closeAsync()).thenReturn(terminationFuture);
 
 		return mock;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index b5662c0..9c23f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -154,7 +154,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 
 			final TestingDispatcher dispatcher = createDispatcher(
 				testingHighAvailabilityServices,
-				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 			dispatcher.start();
 
@@ -223,11 +223,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 			final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
 			final TestingDispatcher dispatcher1 = createDispatcher(
 				haServices1,
-				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null)));
 
 			final TestingDispatcher dispatcher2 = createDispatcher(
 				haServices2,
-				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 			try {
 				dispatcher1.start();
@@ -285,11 +285,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 				final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
 				dispatcher1 = createDispatcher(
 					haServices,
-					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 				final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
 				dispatcher2 = createDispatcher(
 					haServices,
-					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 				dispatcher1.start();
 				dispatcher2.start();