You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/14 16:07:57 UTC

[4/5] flink git commit: [hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes

[hotfix] Make TestingDispatcher and TestingJobManagerRunnerFactory standalone testing classes

Refactors the DispatcherTests and moves the TestingDispatcher and the TestingJobManagerRunnerFactory
to be top level classes. This makes it easier to reuse them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef0a84e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef0a84e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef0a84e8

Branch: refs/heads/master
Commit: ef0a84e84ddedde4cdfdc726dd0e21b5e947fc94
Parents: cacdb68
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 12 14:26:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 14 15:28:03 2018 +0200

----------------------------------------------------------------------
 .../DispatcherResourceCleanupTest.java          | 26 +++++--
 .../runtime/dispatcher/DispatcherTest.java      | 44 ------------
 .../runtime/dispatcher/MiniDispatcherTest.java  | 41 -----------
 .../runtime/dispatcher/TestingDispatcher.java   | 75 ++++++++++++++++++++
 .../TestingJobManagerRunnerFactory.java         | 73 +++++++++++++++++++
 5 files changed, 168 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index f768251..71125e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -150,6 +150,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		clearedJobLatch = new OneShotLatch();
 		runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
 		highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+		highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
 
 		storedBlobFuture = new CompletableFuture<>();
 		deleteAllFuture = new CompletableFuture<>();
@@ -177,7 +178,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
 			configuration,
 			highAvailabilityServices,
-			new InMemorySubmittedJobGraphStore(),
+			highAvailabilityServices.getSubmittedJobGraphStore(),
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
@@ -185,9 +186,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			null,
 			new MemoryArchivedExecutionGraphStore(),
 			new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
-			fatalErrorHandler,
-			null,
-			VoidHistoryServerArchivist.INSTANCE);
+			fatalErrorHandler);
 
 		dispatcher.start();
 
@@ -358,8 +357,23 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 	}
 
 	private static final class TestingDispatcher extends Dispatcher {
-		public TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
-			super(rpcService, endpointId, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist);
+		TestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+			super(
+				rpcService,
+				endpointId,
+				configuration,
+				highAvailabilityServices,
+				submittedJobGraphStore,
+				resourceManagerGateway,
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricServiceQueryPath,
+				archivedExecutionGraphStore,
+				jobManagerRunnerFactory,
+				fatalErrorHandler,
+				null,
+				VoidHistoryServerArchivist.INSTANCE);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index c068020..a25fe7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
@@ -48,9 +47,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -79,7 +76,6 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -535,46 +531,6 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	private static class TestingDispatcher extends Dispatcher {
-
-		private TestingDispatcher(
-				RpcService rpcService,
-				String endpointId,
-				Configuration configuration,
-				HighAvailabilityServices highAvailabilityServices,
-				ResourceManagerGateway resourceManagerGateway,
-				BlobServer blobServer,
-				HeartbeatServices heartbeatServices,
-				JobManagerMetricGroup jobManagerMetricGroup,
-				@Nullable String metricQueryServicePath,
-				ArchivedExecutionGraphStore archivedExecutionGraphStore,
-				JobManagerRunnerFactory jobManagerRunnerFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			super(
-				rpcService,
-				endpointId,
-				configuration,
-				highAvailabilityServices,
-				highAvailabilityServices.getSubmittedJobGraphStore(),
-				resourceManagerGateway,
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricQueryServicePath,
-				archivedExecutionGraphStore,
-				jobManagerRunnerFactory,
-				fatalErrorHandler,
-				null,
-				VoidHistoryServerArchivist.INSTANCE);
-		}
-
-		@VisibleForTesting
-		void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
-			runAsync(
-				() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
-		}
-	}
-
 	private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
 
 		private final JobID expectedJobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index bfe6527..39c06f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -23,24 +23,17 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -65,8 +58,6 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link MiniDispatcher}.
@@ -260,36 +251,4 @@ public class MiniDispatcherTest extends TestLogger {
 			executionMode);
 	}
 
-	private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
-		private final CompletableFuture<JobGraph> jobGraphFuture;
-		private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
-		private TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
-			this.jobGraphFuture = jobGraphFuture;
-			this.resultFuture = resultFuture;
-		}
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
-				JobGraph jobGraph,
-				Configuration configuration,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
-				JobManagerSharedServices jobManagerSharedServices,
-				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			jobGraphFuture.complete(jobGraph);
-
-			final JobManagerRunner mock = mock(JobManagerRunner.class);
-			when(mock.getResultFuture()).thenReturn(resultFuture);
-			when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-
-			return mock;
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
new file mode 100644
index 0000000..f5091ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} implementation used for testing purposes.
+ */
+class TestingDispatcher extends Dispatcher {
+
+	TestingDispatcher(
+		RpcService rpcService,
+		String endpointId,
+		Configuration configuration,
+		HighAvailabilityServices highAvailabilityServices,
+		ResourceManagerGateway resourceManagerGateway,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		@Nullable String metricQueryServicePath,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		JobManagerRunnerFactory jobManagerRunnerFactory,
+		FatalErrorHandler fatalErrorHandler) throws Exception {
+		super(
+			rpcService,
+			endpointId,
+			configuration,
+			highAvailabilityServices,
+			highAvailabilityServices.getSubmittedJobGraphStore(),
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			jobManagerRunnerFactory,
+			fatalErrorHandler,
+			null,
+			VoidHistoryServerArchivist.INSTANCE);
+	}
+
+	@VisibleForTesting
+	void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
+		runAsync(
+			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef0a84e8/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
new file mode 100644
index 0000000..f9be888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * testing purposes.
+ */
+final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+
+	private final CompletableFuture<JobGraph> jobGraphFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+
+	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+		this.jobGraphFuture = jobGraphFuture;
+		this.resultFuture = resultFuture;
+	}
+
+	@Override
+	public JobManagerRunner createJobManagerRunner(
+			ResourceID resourceId,
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			BlobServer blobServer,
+			JobManagerSharedServices jobManagerSharedServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		jobGraphFuture.complete(jobGraph);
+
+		final JobManagerRunner mock = mock(JobManagerRunner.class);
+		when(mock.getResultFuture()).thenReturn(resultFuture);
+		when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+		return mock;
+	}
+}