You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/14 16:07:57 UTC
[4/5] flink git commit: [hotfix] Make TestingDispatcher and
TestingJobManagerRunnerFactory standalone testing classes
[hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes
Refactors the DispatcherTests and moves the TestingDispatcher and the TestingJobManagerRunnerFactory
to be top level classes. This makes it easier to reuse them.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef0a84e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef0a84e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef0a84e8
Branch: refs/heads/master
Commit: ef0a84e84ddedde4cdfdc726dd0e21b5e947fc94
Parents: cacdb68
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:26:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200
----------------------------------------------------------------------
.../DispatcherResourceCleanupTest.java | 26 +++++--
.../runtime/dispatcher/DispatcherTest.java | 44 ------------
.../runtime/dispatcher/MiniDispatcherTest.java | 41 -----------
.../runtime/dispatcher/TestingDispatcher.java | 75 ++++++++++++++++++++
.../TestingJobManagerRunnerFactory.java | 73 +++++++++++++++++++
5 files changed, 168 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index f768251..71125e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -150,6 +150,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
clearedJobLatch = new OneShotLatch();
runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+ highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
storedBlobFuture = new CompletableFuture<>();
deleteAllFuture = new CompletableFuture<>();
@@ -177,7 +178,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
configuration,
highAvailabilityServices,
- new InMemorySubmittedJobGraphStore(),
+ highAvailabilityServices.getSubmittedJobGraphStore(),
new TestingResourceManagerGateway(),
blobServer,
new HeartbeatServices(1000L, 1000L),
@@ -185,9 +186,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
null,
new MemoryArchivedExecutionGraphStore(),
new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
- fatalErrorHandler,
- null,
- VoidHistoryServerArchivist.INSTANCE);
+ fatalErrorHandler);
dispatcher.start();
@@ -358,8 +357,23 @@ public class DispatcherResourceCleanupTest extends TestLogger {
}
private static final class TestingDispatcher extends Dispatcher {
- public TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
- super(rpcService, endpointId, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist);
+ TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+ super(
+ rpcService,
+ endpointId,
+ configuration,
+ highAvailabilityServices,
+ submittedJobGraphStore,
+ resourceManagerGateway,
+ blobServer,
+ heartbeatServices,
+ jobManagerMetricGroup,
+ metricServiceQueryPath,
+ archivedExecutionGraphStore,
+ jobManagerRunnerFactory,
+ fatalErrorHandler,
+ null,
+ VoidHistoryServerArchivist.INSTANCE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c068020..a25fe7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.dispatcher;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
@@ -48,9 +47,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -79,7 +76,6 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@@ -535,46 +531,6 @@ public class DispatcherTest extends TestLogger {
}
}
- private static class TestingDispatcher extends Dispatcher {
-
- private TestingDispatcher(
- RpcService rpcService,
- String endpointId,
- Configuration configuration,
- HighAvailabilityServices highAvailabilityServices,
- ResourceManagerGateway resourceManagerGateway,
- BlobServer blobServer,
- HeartbeatServices heartbeatServices,
- JobManagerMetricGroup jobManagerMetricGroup,
- @Nullable String metricQueryServicePath,
- ArchivedExecutionGraphStore archivedExecutionGraphStore,
- JobManagerRunnerFactory jobManagerRunnerFactory,
- FatalErrorHandler fatalErrorHandler) throws Exception {
- super(
- rpcService,
- endpointId,
- configuration,
- highAvailabilityServices,
- highAvailabilityServices.getSubmittedJobGraphStore(),
- resourceManagerGateway,
- blobServer,
- heartbeatServices,
- jobManagerMetricGroup,
- metricQueryServicePath,
- archivedExecutionGraphStore,
- jobManagerRunnerFactory,
- fatalErrorHandler,
- null,
- VoidHistoryServerArchivist.INSTANCE);
- }
-
- @VisibleForTesting
- void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
- runAsync(
- () -> jobReachedGloballyTerminalState(archivedExecutionGraph));
- }
- }
-
private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
private final JobID expectedJobId;
http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index bfe6527..39c06f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -23,24 +23,17 @@ import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -65,8 +58,6 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests for the {@link MiniDispatcher}.
@@ -260,36 +251,4 @@ public class MiniDispatcherTest extends TestLogger {
executionMode);
}
- private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
- private final CompletableFuture<JobGraph> jobGraphFuture;
- private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
- private TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
- this.jobGraphFuture = jobGraphFuture;
- this.resultFuture = resultFuture;
- }
-
- @Override
- public JobManagerRunner createJobManagerRunner(
- ResourceID resourceId,
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- BlobServer blobServer,
- JobManagerSharedServices jobManagerSharedServices,
- JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler) throws Exception {
- jobGraphFuture.complete(jobGraph);
-
- final JobManagerRunner mock = mock(JobManagerRunner.class);
- when(mock.getResultFuture()).thenReturn(resultFuture);
- when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-
- return mock;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
new file mode 100644
index 0000000..f5091ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} implementation used for testing purposes.
+ */
+class TestingDispatcher extends Dispatcher {
+
+ TestingDispatcher(
+ RpcService rpcService,
+ String endpointId,
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices,
+ ResourceManagerGateway resourceManagerGateway,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ JobManagerMetricGroup jobManagerMetricGroup,
+ @Nullable String metricQueryServicePath,
+ ArchivedExecutionGraphStore archivedExecutionGraphStore,
+ JobManagerRunnerFactory jobManagerRunnerFactory,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ super(
+ rpcService,
+ endpointId,
+ configuration,
+ highAvailabilityServices,
+ highAvailabilityServices.getSubmittedJobGraphStore(),
+ resourceManagerGateway,
+ blobServer,
+ heartbeatServices,
+ jobManagerMetricGroup,
+ metricQueryServicePath,
+ archivedExecutionGraphStore,
+ jobManagerRunnerFactory,
+ fatalErrorHandler,
+ null,
+ VoidHistoryServerArchivist.INSTANCE);
+ }
+
+ @VisibleForTesting
+ void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
+ runAsync(
+ () -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
new file mode 100644
index 0000000..f9be888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * testing purposes.
+ */
+final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+
+ private final CompletableFuture<JobGraph> jobGraphFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+
+ TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+ this.jobGraphFuture = jobGraphFuture;
+ this.resultFuture = resultFuture;
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ BlobServer blobServer,
+ JobManagerSharedServices jobManagerSharedServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ jobGraphFuture.complete(jobGraph);
+
+ final JobManagerRunner mock = mock(JobManagerRunner.class);
+ when(mock.getResultFuture()).thenReturn(resultFuture);
+ when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ return mock;
+ }
+}