You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:20 UTC

[flink] 05/06: [hotfix][tests] Add TestingHighAvailabilityServicesBuilder

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ce4e0c26398d9af427adf83a6a77bfa549a6cf1
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 14:24:56 2019 +0100

    [hotfix][tests] Add TestingHighAvailabilityServicesBuilder
    
    The builder creates a TestingHighAvailabilityServices with the standalone
    implementation of the individual services.
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |   9 +-
 .../TestingHighAvailabilityServices.java           |  18 ++-
 .../TestingHighAvailabilityServicesBuilder.java    | 136 +++++++++++++++++++++
 3 files changed, 156 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2af7271..a6ad3fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -156,11 +156,10 @@ public class DispatcherHATest extends TestLogger {
 	@Test
 	public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
 
-		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
-
 		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder()
+			.setDispatcherLeaderElectionService(leaderElectionService)
+			.build();
 
 		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 		final HATestingDispatcher dispatcher = createDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2489f16..6040335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 /**
  * A variant of the HighAvailabilityServices for testing. Each individual service can be set
@@ -42,6 +43,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService webMonitorEndpointLeaderRetriever;
 
+	private volatile Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = ignored -> null;
+
+	private volatile Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = ignored -> null;
+
 	private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
 
 	private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
@@ -105,6 +110,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public void setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
 		this.runningJobsRegistry = runningJobsRegistry;
 	}
+
+	public void setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+		this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+	}
+
+	public void setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+		this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+	}
+
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
 	// ------------------------------------------------------------------------
@@ -131,7 +145,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
-		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
+		LeaderRetrievalService service = jobMasterLeaderRetrievers.computeIfAbsent(jobID, jobMasterLeaderRetrieverFunction);
 		if (service != null) {
 			return service;
 		} else {
@@ -173,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
-		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
+		LeaderElectionService service = jobManagerLeaderElectionServices.computeIfAbsent(jobID, jobMasterLeaderElectionServiceFunction);
 
 		if (service != null) {
 			return service;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
new file mode 100644
index 0000000..247860a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.function.Function;
+
+/**
+ * Builder for the {@link TestingHighAvailabilityServices}.
+ */
+public class TestingHighAvailabilityServicesBuilder {
+
+	private LeaderRetrievalService resourceManagerLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private LeaderRetrievalService dispatcherLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private LeaderRetrievalService webMonitorEndpointLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = jobId -> new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = jobId -> new StandaloneLeaderElectionService();
+
+	private LeaderElectionService resourceManagerLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private LeaderElectionService dispatcherLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private LeaderElectionService webMonitorEndpointLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+
+	private SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
+
+	private RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
+	public TestingHighAvailabilityServices build() {
+		final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
+
+		testingHighAvailabilityServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+		testingHighAvailabilityServices.setDispatcherLeaderRetriever(dispatcherLeaderRetriever);
+		testingHighAvailabilityServices.setWebMonitorEndpointLeaderRetriever(webMonitorEndpointLeaderRetriever);
+
+		testingHighAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobMasterLeaderRetrieverFunction);
+		testingHighAvailabilityServices.setJobMasterLeaderElectionServiceFunction(jobMasterLeaderElectionServiceFunction);
+
+		testingHighAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+		testingHighAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+		testingHighAvailabilityServices.setWebMonitorEndpointLeaderElectionService(webMonitorEndpointLeaderElectionService);
+
+		testingHighAvailabilityServices.setCheckpointRecoveryFactory(checkpointRecoveryFactory);
+		testingHighAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+		testingHighAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+
+		return testingHighAvailabilityServices;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) {
+		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setDispatcherLeaderRetriever(LeaderRetrievalService dispatcherLeaderRetriever) {
+		this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderRetriever(LeaderRetrievalService webMonitorEndpointLeaderRetriever) {
+		this.webMonitorEndpointLeaderRetriever = webMonitorEndpointLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+		this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+		this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderElectionService(LeaderElectionService resourceManagerLeaderElectionService) {
+		this.resourceManagerLeaderElectionService = resourceManagerLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setDispatcherLeaderElectionService(LeaderElectionService dispatcherLeaderElectionService) {
+		this.dispatcherLeaderElectionService = dispatcherLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderElectionService(LeaderElectionService webMonitorEndpointLeaderElectionService) {
+		this.webMonitorEndpointLeaderElectionService = webMonitorEndpointLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
+		this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) {
+		this.submittedJobGraphStore = submittedJobGraphStore;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
+		this.runningJobsRegistry = runningJobsRegistry;
+		return this;
+	}
+}