You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 19:51:21 UTC
[flink] 01/02: [hotfix] Replace
DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with
TestingJobmanagerRunnerFactory
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 89ddffee2ab396b0beaadc694ab1f765852c8f64
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 11:42:11 2018 +0200
[hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory
---
.../flink/runtime/dispatcher/DispatcherHATest.java | 2 +-
.../dispatcher/DispatcherResourceCleanupTest.java | 33 +---------------------
.../runtime/dispatcher/MiniDispatcherTest.java | 2 +-
.../dispatcher/TestingJobManagerRunnerFactory.java | 8 ++++--
.../dispatcher/ZooKeeperHADispatcherTest.java | 10 +++----
5 files changed, 13 insertions(+), 42 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 335199a..c825451 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -163,7 +163,7 @@ public class DispatcherHATest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
testingFatalErrorHandler,
fencingTokens);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d09ab8d..5c4ac34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,26 +29,19 @@ import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -79,8 +72,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests the resource cleanup by the {@link Dispatcher}.
@@ -188,7 +179,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture),
fatalErrorHandler);
dispatcher.start();
@@ -447,28 +438,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
}
- private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
- private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
- private final CompletableFuture<Void> terminationFuture;
-
- private TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
- this.resultFuture = resultFuture;
- this.terminationFuture = terminationFuture;
- }
-
- @Override
- public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
- final JobManagerRunner jobManagerRunnerMock = mock(JobManagerRunner.class);
-
- when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
- when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
-
- return jobManagerRunnerMock;
- }
- }
-
private static final class TestingBlobServer extends BlobServer {
private final CompletableFuture<JobID> cleanupJobFuture;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 39c06f3..eed23ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -125,7 +125,7 @@ public class MiniDispatcherTest extends TestLogger {
jobGraphFuture = new CompletableFuture<>();
resultFuture = new CompletableFuture<>();
- testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
+ testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null));
}
@After
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index f9be888..cb48648 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -44,10 +44,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
private final CompletableFuture<JobGraph> jobGraphFuture;
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+ private final CompletableFuture<Void> terminationFuture;
- TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+ TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
this.jobGraphFuture = jobGraphFuture;
this.resultFuture = resultFuture;
+ this.terminationFuture = terminationFuture;
}
@Override
@@ -61,12 +63,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
BlobServer blobServer,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler) throws Exception {
+ FatalErrorHandler fatalErrorHandler) {
jobGraphFuture.complete(jobGraph);
final JobManagerRunner mock = mock(JobManagerRunner.class);
when(mock.getResultFuture()).thenReturn(resultFuture);
- when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ when(mock.closeAsync()).thenReturn(terminationFuture);
return mock;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index b5662c0..9c23f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -154,7 +154,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
final TestingDispatcher dispatcher = createDispatcher(
testingHighAvailabilityServices,
- new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
dispatcher.start();
@@ -223,11 +223,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
final TestingDispatcher dispatcher1 = createDispatcher(
haServices1,
- new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+ new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null)));
final TestingDispatcher dispatcher2 = createDispatcher(
haServices2,
- new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
try {
dispatcher1.start();
@@ -285,11 +285,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
dispatcher1 = createDispatcher(
haServices,
- new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
dispatcher2 = createDispatcher(
haServices,
- new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
dispatcher1.start();
dispatcher2.start();