You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:20 UTC
[flink] 05/06: [hotfix][tests] Add
TestingHighAvailabilityServicesBuilder
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ce4e0c26398d9af427adf83a6a77bfa549a6cf1
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 14:24:56 2019 +0100
[hotfix][tests] Add TestingHighAvailabilityServicesBuilder
The builder creates a TestingHighAvailabilityServices with the standalone
implementation of the individual services.
---
.../flink/runtime/dispatcher/DispatcherHATest.java | 9 +-
.../TestingHighAvailabilityServices.java | 18 ++-
.../TestingHighAvailabilityServicesBuilder.java | 136 +++++++++++++++++++++
3 files changed, 156 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2af7271..a6ad3fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -156,11 +156,10 @@ public class DispatcherHATest extends TestLogger {
@Test
public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
- final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
-
final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
- highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+ final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder()
+ .setDispatcherLeaderElectionService(leaderElectionService)
+ .build();
final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
final HATestingDispatcher dispatcher = createDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2489f16..6040335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
/**
* A variant of the HighAvailabilityServices for testing. Each individual service can be set
@@ -42,6 +43,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderRetrievalService webMonitorEndpointLeaderRetriever;
+ private volatile Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = ignored -> null;
+
+ private volatile Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = ignored -> null;
+
private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
@@ -105,6 +110,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
public void setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
this.runningJobsRegistry = runningJobsRegistry;
}
+
+ public void setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+ }
+
+ public void setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+ }
+
// ------------------------------------------------------------------------
// HA Services Methods
// ------------------------------------------------------------------------
@@ -131,7 +145,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
- LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
+ LeaderRetrievalService service = jobMasterLeaderRetrievers.computeIfAbsent(jobID, jobMasterLeaderRetrieverFunction);
if (service != null) {
return service;
} else {
@@ -173,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
- LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
+ LeaderElectionService service = jobManagerLeaderElectionServices.computeIfAbsent(jobID, jobMasterLeaderElectionServiceFunction);
if (service != null) {
return service;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
new file mode 100644
index 0000000..247860a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.function.Function;
+
+/**
+ * Builder for the {@link TestingHighAvailabilityServices}.
+ */
+public class TestingHighAvailabilityServicesBuilder {
+
+ private LeaderRetrievalService resourceManagerLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ private LeaderRetrievalService dispatcherLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ private LeaderRetrievalService webMonitorEndpointLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ private Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = jobId -> new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ private Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = jobId -> new StandaloneLeaderElectionService();
+
+ private LeaderElectionService resourceManagerLeaderElectionService = new StandaloneLeaderElectionService();
+
+ private LeaderElectionService dispatcherLeaderElectionService = new StandaloneLeaderElectionService();
+
+ private LeaderElectionService webMonitorEndpointLeaderElectionService = new StandaloneLeaderElectionService();
+
+ private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+
+ private SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
+
+ private RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
+ public TestingHighAvailabilityServices build() {
+ final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
+
+ testingHighAvailabilityServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+ testingHighAvailabilityServices.setDispatcherLeaderRetriever(dispatcherLeaderRetriever);
+ testingHighAvailabilityServices.setWebMonitorEndpointLeaderRetriever(webMonitorEndpointLeaderRetriever);
+
+ testingHighAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobMasterLeaderRetrieverFunction);
+ testingHighAvailabilityServices.setJobMasterLeaderElectionServiceFunction(jobMasterLeaderElectionServiceFunction);
+
+ testingHighAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+ testingHighAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+ testingHighAvailabilityServices.setWebMonitorEndpointLeaderElectionService(webMonitorEndpointLeaderElectionService);
+
+ testingHighAvailabilityServices.setCheckpointRecoveryFactory(checkpointRecoveryFactory);
+ testingHighAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+ testingHighAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+
+ return testingHighAvailabilityServices;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setDispatcherLeaderRetriever(LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderRetriever(LeaderRetrievalService webMonitorEndpointLeaderRetriever) {
+ this.webMonitorEndpointLeaderRetriever = webMonitorEndpointLeaderRetriever;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderElectionService(LeaderElectionService resourceManagerLeaderElectionService) {
+ this.resourceManagerLeaderElectionService = resourceManagerLeaderElectionService;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setDispatcherLeaderElectionService(LeaderElectionService dispatcherLeaderElectionService) {
+ this.dispatcherLeaderElectionService = dispatcherLeaderElectionService;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderElectionService(LeaderElectionService webMonitorEndpointLeaderElectionService) {
+ this.webMonitorEndpointLeaderElectionService = webMonitorEndpointLeaderElectionService;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) {
+ this.submittedJobGraphStore = submittedJobGraphStore;
+ return this;
+ }
+
+ public TestingHighAvailabilityServicesBuilder setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
+ this.runningJobsRegistry = runningJobsRegistry;
+ return this;
+ }
+}