You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 19:47:15 UTC

[flink] branch master updated (35779f2 -> c841076)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 35779f2  [hotfix] Fix checkstyle violations in AkkaRpcActorTest
     new 0a8ed64  [hotfix] Remove unused JobMasterGateway#requestClassloadingProps
     new e76512b  [hotfix] Pass BlobWriter into JobMaster instead of BlobServer
     new 35a3d79  [hotfix] Move BlobWriter into JobMangerSharedServices
     new 9d9863e  [hotfix][tests] Remove BlobServer from JobManagerRunnerTest
     new 8ce4e0c  [hotfix][tests] Add TestingHighAvailabilityServicesBuilder
     new c841076  [FLINK-11354][tests] Port JobManagerHARecoveryTest to new code base

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/dispatcher/Dispatcher.java       |   9 +-
 .../flink/runtime/jobmaster/JobManagerRunner.java  |   3 -
 .../jobmaster/JobManagerSharedServices.java        |  18 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  18 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   6 -
 .../VoidPermanentBlobService.java}                 |  19 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java | 188 ++++++-
 .../flink/runtime/dispatcher/DispatcherTest.java   |   6 +-
 .../dispatcher/TestingJobManagerRunnerFactory.java |   2 -
 .../ContextClassLoaderLibraryCacheManager.java}    |  45 +-
 .../TestingHighAvailabilityServices.java           |  18 +-
 .../TestingHighAvailabilityServicesBuilder.java    | 136 +++++
 .../jobmanager/JobManagerHARecoveryTest.java       | 605 ---------------------
 .../runtime/jobmaster/JobManagerRunnerTest.java    |  39 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  14 -
 .../TestingJobManagerSharedServicesBuilder.java    |  15 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   5 -
 17 files changed, 415 insertions(+), 731 deletions(-)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/{dispatcher/NoOpSubmittedJobGraphListener.java => blob/VoidPermanentBlobService.java} (65%)
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java => test/java/org/apache/flink/runtime/execution/librarycache/ContextClassLoaderLibraryCacheManager.java} (50%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java


[flink] 02/06: [hotfix] Pass BlobWriter into JobMaster instead of BlobServer

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e76512b44c3e41db75387fe81b3bb6d7289a0fe7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 11:58:19 2019 +0100

    [hotfix] Pass BlobWriter into JobMaster instead of BlobServer
---
 .../java/org/apache/flink/runtime/jobmaster/JobMaster.java     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f892fd7..5a5ca19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -30,7 +30,7 @@ import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -163,7 +163,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	private final BlobServer blobServer;
+	private final BlobWriter blobWriter;
 
 	private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
 
@@ -230,7 +230,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			SlotPoolFactory slotPoolFactory,
 			JobManagerSharedServices jobManagerSharedServices,
 			HeartbeatServices heartbeatServices,
-			BlobServer blobServer,
+			BlobWriter blobWriter,
 			JobManagerJobMetricGroupFactory jobMetricGroupFactory,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler fatalErrorHandler,
@@ -245,7 +245,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobGraph = checkNotNull(jobGraph);
 		this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
-		this.blobServer = checkNotNull(blobServer);
+		this.blobWriter = checkNotNull(blobWriter);
 		this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
@@ -1173,7 +1173,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			rpcTimeout,
 			restartStrategy,
 			currentJobManagerJobMetricGroup,
-			blobServer,
+			blobWriter,
 			jobMasterConfiguration.getSlotRequestTimeout(),
 			log);
 	}


[flink] 06/06: [FLINK-11354][tests] Port JobManagerHARecoveryTest to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8410766515740784aea968662e6dd722b4b2d70
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 14:25:24 2019 +0100

    [FLINK-11354][tests] Port JobManagerHARecoveryTest to new code base
    
    - Moved JobManagerHARecoveryTest#testJobRecoveryWhenLosingLeadership to
    DispatcherHATest#testJobRecoveryWhenChangingLeadership
    
    - Moved JobManagerHARecoveryTest#testFailingJobRecovery to
    DispatcherHATest#testFailingRecoveryIsAFatalError
    
    This closes #7539.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |   5 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java | 179 +++++-
 .../jobmanager/JobManagerHARecoveryTest.java       | 605 ---------------------
 3 files changed, 172 insertions(+), 617 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 544a438..81b826e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -834,7 +834,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				confirmationFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
 						if (throwable != null) {
-							onFatalError(ExceptionUtils.stripCompletionException(throwable));
+							onFatalError(
+								new DispatcherException(
+									String.format("Failed to take leadership with session id %s.", newLeaderSessionID),
+									(ExceptionUtils.stripCompletionException(throwable))));
 						}
 					});
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index a6ad3fb..b965e71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -43,7 +43,10 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -64,6 +67,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -125,7 +129,7 @@ public class DispatcherHATest extends TestLogger {
 
 		final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 
-		final HATestingDispatcher dispatcher = createDispatcher(highAvailabilityServices, fencingTokens);
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(highAvailabilityServices, fencingTokens);
 
 		dispatcher.start();
 
@@ -162,7 +166,7 @@ public class DispatcherHATest extends TestLogger {
 			.build();
 
 		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
-		final HATestingDispatcher dispatcher = createDispatcher(
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(
 			highAvailabilityServices,
 			fencingTokens);
 
@@ -195,20 +199,117 @@ public class DispatcherHATest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a Dispatcher does not remove the JobGraph from the submitted job graph store
+	 * when losing leadership and recovers it when regaining leadership.
+	 */
+	@Test
+	public void testJobRecoveryWhenChangingLeadership() throws Exception {
+		final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
+
+		final CompletableFuture<JobID> recoveredJobFuture = new CompletableFuture<>();
+		submittedJobGraphStore.setRecoverJobGraphFunction((jobID, jobIDSubmittedJobGraphMap) -> {
+			recoveredJobFuture.complete(jobID);
+			return jobIDSubmittedJobGraphMap.get(jobID);
+		});
+
+		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder()
+			.setSubmittedJobGraphStore(submittedJobGraphStore)
+			.setDispatcherLeaderElectionService(leaderElectionService)
+			.build();
+
+		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
+		final HATestingDispatcher dispatcher = createDispatcherWithObservableFencingTokens(
+			highAvailabilityServices,
+			fencingTokens);
+
+		dispatcher.start();
+
+		try {
+			// grant leadership and submit a single job
+			final DispatcherId expectedDispatcherId = DispatcherId.generate();
+			leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
+
+			assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId)));
+
+			final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+			final JobGraph jobGraph = createNonEmptyJobGraph();
+			final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+			submissionFuture.get();
+
+			final JobID jobId = jobGraph.getJobID();
+			assertThat(submittedJobGraphStore.contains(jobId), is(true));
+
+			// revoke the leadership --> this should stop all running JobManagerRunners
+			leaderElectionService.notLeader();
+
+			assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN)));
+
+			assertThat(submittedJobGraphStore.contains(jobId), is(true));
+
+			assertThat(recoveredJobFuture.isDone(), is(false));
+
+			// re-grant leadership
+			leaderElectionService.isLeader(DispatcherId.generate().toUUID());
+
+			assertThat(recoveredJobFuture.get(), is(equalTo(jobId)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	/**
+	 * Tests that a fatal error is reported if the job recovery fails.
+	 */
+	@Test
+	public void testFailingRecoveryIsAFatalError() throws Exception {
+		final String exceptionMessage = "Job recovery test failure.";
+		final Supplier<Exception> exceptionSupplier = () -> new FlinkException(exceptionMessage);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
+			.setSubmittedJobGraphStore(new FailingSubmittedJobGraphStore(exceptionSupplier))
+			.build();
+
+		final HATestingDispatcher dispatcher = createDispatcher(haServices);
+		dispatcher.start();
+
+		final Throwable failure = testingFatalErrorHandler.getErrorFuture().get();
+
+		assertThat(ExceptionUtils.findThrowableWithMessage(failure, exceptionMessage).isPresent(), is(true));
+
+		testingFatalErrorHandler.clearError();
+	}
+
 	@Nonnull
-	public static JobGraph createNonEmptyJobGraph() {
-		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-		noOpVertex.setInvokableClass(NoOpInvokable.class);
-		final JobGraph jobGraph = new JobGraph(noOpVertex);
-		jobGraph.setAllowQueuedScheduling(true);
+	private HATestingDispatcher createDispatcherWithObservableFencingTokens(HighAvailabilityServices highAvailabilityServices, Queue<DispatcherId> fencingTokens) throws Exception {
+		return createDispatcher(highAvailabilityServices, fencingTokens, createTestingJobManagerRunnerFactory());
+	}
 
-		return jobGraph;
+	@Nonnull
+	private TestingJobManagerRunnerFactory createTestingJobManagerRunnerFactory() {
+		return new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
+	}
+
+	@Nonnull
+	private HATestingDispatcher createDispatcherWithJobManagerRunnerFactory(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		return createDispatcher(highAvailabilityServices, null, jobManagerRunnerFactory);
+	}
+
+	private HATestingDispatcher createDispatcher(HighAvailabilityServices haServices) throws Exception {
+		return createDispatcher(
+			haServices,
+			null,
+			createTestingJobManagerRunnerFactory());
 	}
 
 	@Nonnull
 	private HATestingDispatcher createDispatcher(
-			TestingHighAvailabilityServices highAvailabilityServices,
-			@Nonnull Queue<DispatcherId> fencingTokens) throws Exception {
+		HighAvailabilityServices highAvailabilityServices,
+		@Nullable Queue<DispatcherId> fencingTokens,
+		Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final Configuration configuration = new Configuration();
 
 		return new HATestingDispatcher(
@@ -222,11 +323,21 @@ public class DispatcherHATest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
+			jobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			fencingTokens);
 	}
 
+	@Nonnull
+	public static JobGraph createNonEmptyJobGraph() {
+		final JobVertex noOpVertex = new JobVertex("NoOp vertex");
+		noOpVertex.setInvokableClass(NoOpInvokable.class);
+		final JobGraph jobGraph = new JobGraph(noOpVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
 	private static class HATestingDispatcher extends TestingDispatcher {
 
 		@Nonnull
@@ -334,4 +445,50 @@ public class DispatcherHATest extends TestLogger {
 			return Collections.singleton(submittedJobGraph.getJobId());
 		}
 	}
+
+	private static class FailingSubmittedJobGraphStore implements SubmittedJobGraphStore {
+		private final JobID jobId = new JobID();
+
+		private final Supplier<Exception> exceptionSupplier;
+
+		private FailingSubmittedJobGraphStore(Supplier<Exception> exceptionSupplier) {
+			this.exceptionSupplier = exceptionSupplier;
+		}
+
+		@Override
+		public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+
+		}
+
+		@Override
+		public void stop() throws Exception {
+
+		}
+
+		@Nullable
+		@Override
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+			throw exceptionSupplier.get();
+		}
+
+		@Override
+		public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+
+		}
+
+		@Override
+		public void removeJobGraph(JobID jobId) throws Exception {
+
+		}
+
+		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+
+		}
+
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			return Collections.singleton(jobId);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
deleted file mode 100644
index d991983..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.instance.InstanceManager;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStreamStateHandle;
-import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Identify;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.pf.FI;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.testkit.CallingThreadDispatcher;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.Int;
-import scala.Option;
-import scala.PartialFunction;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.runtime.BoxedUnit;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class JobManagerHARecoveryTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	/**
-	 * Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager
-	 * loses its leadership. Furthermore, it tests that the job manager can recover the job from
-	 * the SubmittedJobGraphStore and checkpoint state is recovered as well.
-	 */
-	@Test
-	public void testJobRecoveryWhenLosingLeadership() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
-		FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
-		Configuration flinkConfiguration = new Configuration();
-		UUID leaderSessionID = UUID.randomUUID();
-		UUID newLeaderSessionID = UUID.randomUUID();
-		int slots = 2;
-		ActorRef archive = null;
-		ActorRef jobManager = null;
-		ActorRef taskManager = null;
-
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
-		flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
-		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
-
-		try {
-			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-			InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
-			submittedJobGraphStore.start(null);
-			CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore();
-			CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
-			CheckpointRecoveryFactory checkpointStateFactory = new TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
-			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
-			SettableLeaderRetrievalService myLeaderRetrievalService = new SettableLeaderRetrievalService(
-				null,
-				null);
-			TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
-
-			testingHighAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, myLeaderRetrievalService);
-
-			InstanceManager instanceManager = new InstanceManager();
-			instanceManager.addInstanceListener(scheduler);
-
-			archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty()));
-
-			BlobServer blobServer = new BlobServer(
-				flinkConfiguration,
-				testingHighAvailabilityServices.createBlobStore());
-			blobServer.start();
-			Props jobManagerProps = Props.create(
-				TestingJobManager.class,
-				flinkConfiguration,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				instanceManager,
-				scheduler,
-				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
-				archive,
-				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-				timeout,
-				myLeaderElectionService,
-				submittedJobGraphStore,
-				checkpointStateFactory,
-				jobRecoveryTimeout,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-				Option.<String>empty());
-
-			jobManager = system.actorOf(jobManagerProps);
-			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
-			taskManager = TaskManager.startTaskManagerComponentsAndActor(
-				flinkConfiguration,
-				ResourceID.generate(),
-				system,
-				testingHighAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				"localhost",
-				Option.<String>apply("taskmanager"),
-				true,
-				TestingTaskManager.class);
-
-			ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
-
-			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
-
-			Await.ready(tmAlive, deadline.timeLeft());
-
-			JobVertex sourceJobVertex = new JobVertex("Source");
-			sourceJobVertex.setInvokableClass(BlockingStatefulInvokable.class);
-			sourceJobVertex.setParallelism(slots);
-
-			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
-
-			List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
-			jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
-					vertexId,
-					vertexId,
-					vertexId,
-					new CheckpointCoordinatorConfiguration(
-						100L,
-						10L * 60L * 1000L,
-						0L,
-						1,
-						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-						true),
-					null));
-
-			BlockingStatefulInvokable.initializeStaticHelpers(slots);
-
-			Future<Object> isLeader = gateway.ask(
-					TestingJobManagerMessages.getNotifyWhenLeader(),
-					deadline.timeLeft());
-
-			Future<Object> isConnectedToJobManager = tmGateway.ask(
-					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
-					deadline.timeLeft());
-
-			// tell jobManager that he's the leader
-			myLeaderElectionService.isLeader(leaderSessionID);
-			// tell taskManager who's the leader
-			myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
-
-			Await.ready(isLeader, deadline.timeLeft());
-			Await.ready(isConnectedToJobManager, deadline.timeLeft());
-
-			// submit blocking job
-			Future<Object> jobSubmitted = gateway.ask(
-					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
-					deadline.timeLeft());
-
-			Await.ready(jobSubmitted, deadline.timeLeft());
-
-			// Wait for some checkpoints to complete
-			BlockingStatefulInvokable.awaitCompletedCheckpoints();
-
-			Future<Object> jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
-			// Revoke leadership
-			myLeaderElectionService.notLeader();
-
-			// check that the job gets removed from the JobManager
-			Await.ready(jobRemoved, deadline.timeLeft());
-			// but stays in the submitted job graph store
-			assertTrue(submittedJobGraphStore.contains(jobGraph.getJobID()));
-
-			Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
-
-			// Make JobManager again a leader
-			myLeaderElectionService.isLeader(newLeaderSessionID);
-			// tell the TaskManager about it
-			myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
-
-			// wait that the job is recovered and reaches state RUNNING
-			Await.ready(jobRunning, deadline.timeLeft());
-
-			Future<Object> jobFinished = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
-			BlockingInvokable.unblock();
-
-			// wait til the job has finished
-			Await.ready(jobFinished, deadline.timeLeft());
-
-			// check that the job has been removed from the submitted job graph store
-			assertFalse(submittedJobGraphStore.contains(jobGraph.getJobID()));
-
-			// Check that state has been recovered
-			long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
-			for (long state : recoveredStates) {
-				boolean isExpected = state >= BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE;
-				assertTrue("Did not recover checkpoint state correctly, expecting >= " +
-						BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE +
-						", but state was " + state, isExpected);
-			}
-		} finally {
-			if (archive != null) {
-				archive.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-
-			if (jobManager != null) {
-				jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-
-			if (taskManager != null) {
-				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-			}
-		}
-	}
-
-	/**
-	 * Tests that a job recovery failure terminates the {@link JobManager}.
-	 */
-	@Test
-	public void testFailingJobRecovery() throws Exception {
-		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-		final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
-		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
-		final Configuration flinkConfiguration = new Configuration();
-		UUID leaderSessionID = UUID.randomUUID();
-		ActorRef jobManager = null;
-		JobID jobId1 = new JobID();
-		JobID jobId2 = new JobID();
-
-		// set HA mode to zookeeper so that we try to recover jobs
-		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
-		try {
-			final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
-
-			SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
-			when(submittedJobGraph.getJobId()).thenReturn(jobId2);
-
-			when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
-
-			// fail the first job recovery
-			when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception"));
-			// succeed the second job recovery
-			when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
-
-			final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
-
-			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
-
-			BlobServer blobServer = mock(BlobServer.class);
-			Props jobManagerProps = Props.create(
-				TestingFailingHAJobManager.class,
-				flinkConfiguration,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				mock(InstanceManager.class),
-				mock(Scheduler.class),
-				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
-				ActorRef.noSender(),
-				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-				timeout,
-				myLeaderElectionService,
-				submittedJobGraphStore,
-				mock(CheckpointRecoveryFactory.class),
-				jobRecoveryTimeout,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
-				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
-
-			jobManager = system.actorOf(jobManagerProps);
-
-			final TestProbe testProbe = new TestProbe(system);
-
-			testProbe.watch(jobManager);
-
-			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
-
-			Await.ready(started, deadline.timeLeft());
-
-			// make the job manager the leader --> this triggers the recovery of all jobs
-			myLeaderElectionService.isLeader(leaderSessionID);
-
-			// check that we did not recover any jobs
-			assertThat(recoveredJobs, is(empty()));
-
-			// verify that the JobManager terminated
-			testProbe.expectTerminated(jobManager, timeout);
-		} finally {
-			TestingUtils.stopActor(jobManager);
-		}
-	}
-
-	static class TestingFailingHAJobManager extends JobManager {
-
-		private final Collection<JobID> recoveredJobs;
-
-		public TestingFailingHAJobManager(
-			Configuration flinkConfiguration,
-			ScheduledExecutorService futureExecutor,
-			Executor ioExecutor,
-			InstanceManager instanceManager,
-			Scheduler scheduler,
-			BlobServer blobServer,
-			BlobLibraryCacheManager libraryCacheManager,
-			ActorRef archive,
-			RestartStrategyFactory restartStrategyFactory,
-			FiniteDuration timeout,
-			LeaderElectionService leaderElectionService,
-			SubmittedJobGraphStore submittedJobGraphs,
-			CheckpointRecoveryFactory checkpointRecoveryFactory,
-			FiniteDuration jobRecoveryTimeout,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			Collection<JobID> recoveredJobs) {
-			super(
-				flinkConfiguration,
-				futureExecutor,
-				ioExecutor,
-				instanceManager,
-				scheduler,
-				blobServer,
-				libraryCacheManager,
-				archive,
-				restartStrategyFactory,
-				timeout,
-				leaderElectionService,
-				submittedJobGraphs,
-				checkpointRecoveryFactory,
-				jobRecoveryTimeout,
-				jobManagerMetricGroup,
-				Option.<String>empty());
-
-			this.recoveredJobs = recoveredJobs;
-		}
-
-		@Override
-		public PartialFunction<Object, BoxedUnit> handleMessage() {
-			return ReceiveBuilder.match(
-				JobManagerMessages.RecoverSubmittedJob.class,
-				new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
-					@Override
-					public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
-						recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
-					}
-				}).matchAny(new FI.UnitApply<Object>() {
-				@Override
-				public void apply(Object o) throws Exception {
-					TestingFailingHAJobManager.super.handleMessage().apply(o);
-				}
-			}).build();
-		}
-	}
-
-	public static class BlockingInvokable extends AbstractInvokable {
-
-		private static final OneShotLatch LATCH = new OneShotLatch();
-
-		public BlockingInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-
-			OperatorID operatorID = OperatorID.fromJobVertexID(getEnvironment().getJobVertexId());
-			TaskStateManager taskStateManager = getEnvironment().getTaskStateManager();
-			PrioritizedOperatorSubtaskState subtaskState = taskStateManager.prioritizedOperatorState(operatorID);
-
-			int subtaskIndex = getIndexInSubtaskGroup();
-			if (subtaskIndex < BlockingStatefulInvokable.recoveredStates.length) {
-				Iterator<OperatorStateHandle> iterator =
-					subtaskState.getJobManagerManagedOperatorState().iterator();
-
-				if (iterator.hasNext()) {
-					OperatorStateHandle operatorStateHandle = iterator.next();
-					try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
-						BlockingStatefulInvokable.recoveredStates[subtaskIndex] =
-							InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
-					}
-				}
-				Assert.assertFalse(iterator.hasNext());
-			}
-
-			LATCH.await();
-		}
-
-		public static void unblock() {
-			LATCH.trigger();
-		}
-	}
-
-	public static class BlockingStatefulInvokable extends BlockingInvokable {
-
-		private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
-
-		private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
-
-		static volatile long[] recoveredStates = new long[0];
-
-		private int completedCheckpoints = 0;
-
-		public BlockingStatefulInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
-			ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
-					String.valueOf(UUID.randomUUID()),
-					InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
-
-			Map<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<>(1);
-			stateNameToPartitionOffsets.put(
-				"test-state",
-				new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-
-			OperatorStateHandle operatorStateHandle = new OperatorStreamStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle);
-
-			TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
-			checkpointStateHandles.putSubtaskStateByOperatorID(
-				OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
-				new OperatorSubtaskState(
-					StateObjectCollection.singleton(operatorStateHandle),
-					StateObjectCollection.empty(),
-					StateObjectCollection.empty(),
-					StateObjectCollection.empty()));
-
-			getEnvironment().acknowledgeCheckpoint(
-					checkpointMetaData.getCheckpointId(),
-					new CheckpointMetrics(0L, 0L, 0L, 0L),
-					checkpointStateHandles);
-			return true;
-		}
-
-		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
-			throw new UnsupportedOperationException("should not be called!");
-		}
-
-		@Override
-		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
-			throw new UnsupportedOperationException("should not be called!");
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
-				completedCheckpointsLatch.countDown();
-			}
-		}
-
-		static void initializeStaticHelpers(int numSubtasks) {
-			completedCheckpointsLatch = new CountDownLatch(numSubtasks);
-			recoveredStates = new long[numSubtasks];
-		}
-
-		static void awaitCompletedCheckpoints() throws InterruptedException {
-			completedCheckpointsLatch.await();
-		}
-
-		static long[] getRecoveredStates() {
-			return recoveredStates;
-		}
-
-		private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
-			Set<Map.Entry<OperatorID, OperatorSubtaskState>> subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
-			Preconditions.checkNotNull(subtaskStateMappings);
-			Preconditions.checkState(subtaskStateMappings.size()  == 1);
-			OperatorSubtaskState subtaskState = subtaskStateMappings.iterator().next().getValue();
-			Collection<OperatorStateHandle> managedOperatorState =
-				Preconditions.checkNotNull(subtaskState).getManagedOperatorState();
-			Preconditions.checkNotNull(managedOperatorState);
-			Preconditions.checkState(managedOperatorState.size()  == 1);
-			return managedOperatorState.iterator().next();
-		}
-	}
-}


[flink] 04/06: [hotfix][tests] Remove BlobServer from JobManagerRunnerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d9863e9119e6394ecee68c80f7ae22ce4c7aa59
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 13:43:37 2019 +0100

    [hotfix][tests] Remove BlobServer from JobManagerRunnerTest
---
 .../runtime/blob/VoidPermanentBlobService.java     | 41 +++++++++++++
 .../ContextClassLoaderLibraryCacheManager.java     | 70 ++++++++++++++++++++++
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 34 +++++------
 .../TestingJobManagerSharedServicesBuilder.java    |  3 +-
 4 files changed, 130 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/VoidPermanentBlobService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/VoidPermanentBlobService.java
new file mode 100644
index 0000000..d431a90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/VoidPermanentBlobService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * {@link PermanentBlobService} implementation for testing purposes.
+ */
+public enum VoidPermanentBlobService implements PermanentBlobService {
+	INSTANCE;
+
+	@Override
+	public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
+		throw new IOException(String.format("Could not find file for job id %s and key %s.", jobId, key));
+	}
+
+	@Override
+	public void close() throws IOException {
+
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ContextClassLoaderLibraryCacheManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ContextClassLoaderLibraryCacheManager.java
new file mode 100644
index 0000000..9b866b3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ContextClassLoaderLibraryCacheManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import javax.annotation.Nonnull;
+
+import java.net.URL;
+import java.util.Collection;
+
+/**
+ * {@link LibraryCacheManager} implementation which returns the context class loader.
+ */
+public enum ContextClassLoaderLibraryCacheManager implements LibraryCacheManager {
+	INSTANCE;
+
+	@Override
+	public ClassLoader getClassLoader(JobID id) {
+		return getClass().getClassLoader();
+	}
+
+	@Override
+	public void registerJob(JobID id, Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
+
+	}
+
+	@Override
+	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
+
+	}
+
+	@Override
+	public void unregisterTask(JobID id, ExecutionAttemptID execution) {
+
+	}
+
+	@Override
+	public void unregisterJob(JobID id) {
+
+	}
+
+	@Override
+	public void shutdown() {
+
+	}
+
+	@Override
+	public boolean hasClassLoader(@Nonnull JobID jobId) {
+		return true;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index bf2576b..da5a9ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.blob.VoidPermanentBlobService;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,8 +72,6 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	private static TestingRpcService rpcService;
 
-	private static BlobServer blobServer;
-
 	private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
 
 	private static JobManagerSharedServices jobManagerSharedServices;
@@ -93,11 +91,7 @@ public class JobManagerRunnerTest extends TestLogger {
 
 		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-		blobServer = new BlobServer(
-			configuration,
-			new VoidBlobStore());
-
-		jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, blobServer);
+		jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
 
 		final JobVertex jobVertex = new JobVertex("Test vertex");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -130,10 +124,6 @@ public class JobManagerRunnerTest extends TestLogger {
 			jobManagerSharedServices.shutdown();
 		}
 
-		if (blobServer != null) {
-			blobServer.close();
-		}
-
 		if (rpcService != null) {
 			rpcService.stopService();
 		}
@@ -208,13 +198,18 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	@Test
 	public void testLibraryCacheManagerRegistration() throws Exception {
-		final JobManagerRunner jobManagerRunner = createJobManagerRunner();
+		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
+			VoidPermanentBlobService.INSTANCE,
+			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+			new String[]{});
+		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder()
+			.setLibraryCacheManager(libraryCacheManager)
+			.build();
+		final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobManagerSharedServices);
 
 		try {
 			jobManagerRunner.start();
 
-			final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
-
 			final JobID jobID = jobGraph.getJobID();
 			assertThat(libraryCacheManager.hasClassLoader(jobID), is(true));
 
@@ -228,6 +223,11 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	@Nonnull
 	private JobManagerRunner createJobManagerRunner() throws Exception {
+		return createJobManagerRunner(jobManagerSharedServices);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner(final JobManagerSharedServices jobManagerSharedServices) throws Exception {
 		return new JobManagerRunner(
 			ResourceID.generate(),
 			jobGraph,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index ae20d1e..1edffe4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -51,7 +52,7 @@ public class TestingJobManagerSharedServicesBuilder {
 
 	public TestingJobManagerSharedServicesBuilder() {
 		scheduledExecutorService = TestingUtils.defaultExecutor();
-		libraryCacheManager = mock(LibraryCacheManager.class);
+		libraryCacheManager = ContextClassLoaderLibraryCacheManager.INSTANCE;
 		restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
 		stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
 		backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;


[flink] 03/06: [hotfix] Move BlobWriter into JobMangerSharedServices

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35a3d7918d9d19ca9f718137179e6391854dc84a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 12:03:05 2019 +0100

    [hotfix] Move BlobWriter into JobMangerSharedServices
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java    |  4 ----
 .../flink/runtime/jobmaster/JobManagerRunner.java      |  3 ---
 .../runtime/jobmaster/JobManagerSharedServices.java    | 18 ++++++++++++++++--
 .../org/apache/flink/runtime/jobmaster/JobMaster.java  |  3 +--
 .../flink/runtime/dispatcher/DispatcherTest.java       |  6 ++----
 .../dispatcher/TestingJobManagerRunnerFactory.java     |  2 --
 .../flink/runtime/jobmaster/JobManagerRunnerTest.java  |  5 ++---
 .../apache/flink/runtime/jobmaster/JobMasterTest.java  | 14 --------------
 .../TestingJobManagerSharedServicesBuilder.java        | 12 +++++++++++-
 9 files changed, 32 insertions(+), 35 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 2330180..544a438 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -314,7 +314,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					rpcService,
 					highAvailabilityServices,
 					heartbeatServices,
-					blobServer,
 					jobManagerSharedServices,
 					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
 					fatalErrorHandler)),
@@ -1024,7 +1023,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			BlobServer blobServer,
 			JobManagerSharedServices jobManagerServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			FatalErrorHandler fatalErrorHandler) throws Exception;
@@ -1044,7 +1042,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
 				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
 				JobManagerSharedServices jobManagerServices,
 				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 				FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -1055,7 +1052,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				rpcService,
 				highAvailabilityServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerServices,
 				jobManagerJobMetricGroupFactory,
 				fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 300462a..c10a535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -109,7 +108,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
 			final HeartbeatServices heartbeatServices,
-			final BlobServer blobServer,
 			final JobManagerSharedServices jobManagerSharedServices,
 			final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			final FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -163,7 +161,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				slotPoolFactory,
 				jobManagerSharedServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerJobMetricGroupFactory,
 				this,
 				fatalErrorHandler,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index c1e910c..6c6696f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -36,6 +37,8 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -60,18 +63,23 @@ public class JobManagerSharedServices {
 
 	private final BackPressureStatsTracker backPressureStatsTracker;
 
+	@Nonnull
+	private final BlobWriter blobWriter;
+
 	public JobManagerSharedServices(
 			ScheduledExecutorService scheduledExecutorService,
 			LibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			StackTraceSampleCoordinator stackTraceSampleCoordinator,
-			BackPressureStatsTracker backPressureStatsTracker) {
+			BackPressureStatsTracker backPressureStatsTracker,
+			@Nonnull BlobWriter blobWriter) {
 
 		this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
 		this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
 		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
+		this.blobWriter = blobWriter;
 	}
 
 	public ScheduledExecutorService getScheduledExecutorService() {
@@ -90,6 +98,11 @@ public class JobManagerSharedServices {
 		return backPressureStatsTracker;
 	}
 
+	@Nonnull
+	public BlobWriter getBlobWriter() {
+		return blobWriter;
+	}
+
 	/**
 	 * Shutdown the {@link JobMaster} services.
 	 *
@@ -171,6 +184,7 @@ public class JobManagerSharedServices {
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			stackTraceSampleCoordinator,
-			backPressureStatsTracker);
+			backPressureStatsTracker,
+			blobServer);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5a5ca19..90b8b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -230,7 +230,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			SlotPoolFactory slotPoolFactory,
 			JobManagerSharedServices jobManagerSharedServices,
 			HeartbeatServices heartbeatServices,
-			BlobWriter blobWriter,
 			JobManagerJobMetricGroupFactory jobMetricGroupFactory,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler fatalErrorHandler,
@@ -245,7 +244,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobGraph = checkNotNull(jobGraph);
 		this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
-		this.blobWriter = checkNotNull(blobWriter);
+		this.blobWriter = jobManagerSharedServices.getBlobWriter();
 		this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 677ce5a..e10b8bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -697,10 +697,10 @@ public class DispatcherTest extends TestLogger {
 		}
 
 		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
 			jobManagerRunnerCreationLatch.run();
 
-			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
 		}
 	}
 
@@ -754,7 +754,6 @@ public class DispatcherTest extends TestLogger {
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
 				HeartbeatServices heartbeatServices,
-				BlobServer blobServer,
 				JobManagerSharedServices jobManagerSharedServices,
 				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 				FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -769,7 +768,6 @@ public class DispatcherTest extends TestLogger {
 				rpcService,
 				highAvailabilityServices,
 				heartbeatServices,
-				blobServer,
 				jobManagerSharedServices,
 				jobManagerJobMetricGroupFactory,
 				fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 63574cf..30e4af6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -75,7 +74,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 08f6fe5..bf2576b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -61,7 +61,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link JobManagerRunner}
+ * Tests for the {@link JobManagerRunner}.
  */
 public class JobManagerRunnerTest extends TestLogger {
 
@@ -138,7 +138,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			rpcService.stopService();
 		}
 	}
-	
+
 	@Test
 	public void testJobCompletion() throws Exception {
 		final JobManagerRunner jobManagerRunner = createJobManagerRunner();
@@ -235,7 +235,6 @@ public class JobManagerRunnerTest extends TestLogger {
 			rpcService,
 			haServices,
 			heartbeatServices,
-			blobServer,
 			jobManagerSharedServices,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 56d5044..9c51eab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -34,8 +34,6 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -193,8 +191,6 @@ public class JobMasterTest extends TestLogger {
 
 	private static HeartbeatServices heartbeatServices;
 
-	private BlobServer blobServer;
-
 	private Configuration configuration;
 
 	private ResourceID jmResourceId;
@@ -232,9 +228,6 @@ public class JobMasterTest extends TestLogger {
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
 		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-		blobServer = new BlobServer(configuration, new VoidBlobStore());
-
-		blobServer.start();
 	}
 
 	@After
@@ -243,10 +236,6 @@ public class JobMasterTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 		}
 
-		if (blobServer != null) {
-			blobServer.close();
-		}
-
 		rpcService.clearGateways();
 	}
 
@@ -283,7 +272,6 @@ public class JobMasterTest extends TestLogger {
 				DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
 				jobManagerSharedServices,
 				heartbeatServices,
-				blobServer,
 				UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 				new NoOpOnCompletionActions(),
 				testingFatalErrorHandler,
@@ -1339,7 +1327,6 @@ public class JobMasterTest extends TestLogger {
 			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices,
-			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),
 			testingFatalErrorHandler,
@@ -1553,7 +1540,6 @@ public class JobMasterTest extends TestLogger {
 			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
 			jobManagerSharedServices,
 			heartbeatServices,
-			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),
 			testingFatalErrorHandler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index 030e4e6..ae20d1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -45,12 +47,15 @@ public class TestingJobManagerSharedServicesBuilder {
 
 	private BackPressureStatsTracker backPressureStatsTracker;
 
+	private BlobWriter blobWriter;
+
 	public TestingJobManagerSharedServicesBuilder() {
 		scheduledExecutorService = TestingUtils.defaultExecutor();
 		libraryCacheManager = mock(LibraryCacheManager.class);
 		restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
 		stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
 		backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+		blobWriter = VoidBlobWriter.getInstance();
 	}
 
 	public TestingJobManagerSharedServicesBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
@@ -80,12 +85,17 @@ public class TestingJobManagerSharedServicesBuilder {
 
 	}
 
+	public void setBlobWriter(BlobWriter blobWriter) {
+		this.blobWriter = blobWriter;
+	}
+
 	public JobManagerSharedServices build() {
 		return new JobManagerSharedServices(
 			scheduledExecutorService,
 			libraryCacheManager,
 			restartStrategyFactory,
 			stackTraceSampleCoordinator,
-			backPressureStatsTracker);
+			backPressureStatsTracker,
+			blobWriter);
 	}
 }


[flink] 01/06: [hotfix] Remove unused JobMasterGateway#requestClassloadingProps

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a8ed648893ebf502d810c2cf3353ddaeebd994a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 11:57:35 2019 +0100

    [hotfix] Remove unused JobMasterGateway#requestClassloadingProps
---
 .../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java  | 9 ---------
 .../org/apache/flink/runtime/jobmaster/JobMasterGateway.java     | 6 ------
 .../flink/runtime/jobmaster/utils/TestingJobMasterGateway.java   | 5 -----
 3 files changed, 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2fdf79c..f892fd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -70,7 +70,6 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
@@ -800,14 +799,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
-		return CompletableFuture.completedFuture(
-			new ClassloadingProps(blobServer.getPort(),
-				executionGraph.getRequiredJarFiles(),
-				executionGraph.getRequiredClasspaths()));
-	}
-
-	@Override
 	public CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
 			final Collection<SlotOffer> slots,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index bc073c1..8854a00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -175,11 +174,6 @@ public interface JobMasterGateway extends
 		final Exception cause);
 
 	/**
-	 * Request the classloading props of this job.
-	 */
-	CompletableFuture<ClassloadingProps> requestClassloadingProps();
-
-	/**
 	 * Offers the given slots to the job manager. The response contains the set of accepted slots.
 	 *
 	 * @param taskManagerId identifying the task manager
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index f5f7f8e..beb4c95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -271,11 +271,6 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	}
 
 	@Override
-	public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
-		return classloadingPropsSupplier.get();
-	}
-
-	@Override
 	public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID taskManagerId, Collection<SlotOffer> slots, Time timeout) {
 		return offerSlotsFunction.apply(taskManagerId, slots);
 	}


[flink] 05/06: [hotfix][tests] Add TestingHighAvailabilityServicesBuilder

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ce4e0c26398d9af427adf83a6a77bfa549a6cf1
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 14:24:56 2019 +0100

    [hotfix][tests] Add TestingHighAvailabilityServicesBuilder
    
    The builder creates a TestingHighAvailabilityServices with the standalone
    implementation of the individual services.
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |   9 +-
 .../TestingHighAvailabilityServices.java           |  18 ++-
 .../TestingHighAvailabilityServicesBuilder.java    | 136 +++++++++++++++++++++
 3 files changed, 156 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2af7271..a6ad3fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -156,11 +156,10 @@ public class DispatcherHATest extends TestLogger {
 	@Test
 	public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
 
-		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
-
 		final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder()
+			.setDispatcherLeaderElectionService(leaderElectionService)
+			.build();
 
 		final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2);
 		final HATestingDispatcher dispatcher = createDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2489f16..6040335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 /**
  * A variant of the HighAvailabilityServices for testing. Each individual service can be set
@@ -42,6 +43,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService webMonitorEndpointLeaderRetriever;
 
+	private volatile Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = ignored -> null;
+
+	private volatile Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = ignored -> null;
+
 	private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
 
 	private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
@@ -105,6 +110,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public void setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
 		this.runningJobsRegistry = runningJobsRegistry;
 	}
+
+	public void setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+		this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+	}
+
+	public void setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+		this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+	}
+
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
 	// ------------------------------------------------------------------------
@@ -131,7 +145,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
-		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
+		LeaderRetrievalService service = jobMasterLeaderRetrievers.computeIfAbsent(jobID, jobMasterLeaderRetrieverFunction);
 		if (service != null) {
 			return service;
 		} else {
@@ -173,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
-		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
+		LeaderElectionService service = jobManagerLeaderElectionServices.computeIfAbsent(jobID, jobMasterLeaderElectionServiceFunction);
 
 		if (service != null) {
 			return service;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
new file mode 100644
index 0000000..247860a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.function.Function;
+
+/**
+ * Builder for the {@link TestingHighAvailabilityServices}.
+ */
+public class TestingHighAvailabilityServicesBuilder {
+
+	private LeaderRetrievalService resourceManagerLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private LeaderRetrievalService dispatcherLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private LeaderRetrievalService webMonitorEndpointLeaderRetriever = new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction = jobId -> new StandaloneLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+	private Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction = jobId -> new StandaloneLeaderElectionService();
+
+	private LeaderElectionService resourceManagerLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private LeaderElectionService dispatcherLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private LeaderElectionService webMonitorEndpointLeaderElectionService = new StandaloneLeaderElectionService();
+
+	private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+
+	private SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
+
+	private RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
+	public TestingHighAvailabilityServices build() {
+		final TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
+
+		testingHighAvailabilityServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+		testingHighAvailabilityServices.setDispatcherLeaderRetriever(dispatcherLeaderRetriever);
+		testingHighAvailabilityServices.setWebMonitorEndpointLeaderRetriever(webMonitorEndpointLeaderRetriever);
+
+		testingHighAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobMasterLeaderRetrieverFunction);
+		testingHighAvailabilityServices.setJobMasterLeaderElectionServiceFunction(jobMasterLeaderElectionServiceFunction);
+
+		testingHighAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+		testingHighAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+		testingHighAvailabilityServices.setWebMonitorEndpointLeaderElectionService(webMonitorEndpointLeaderElectionService);
+
+		testingHighAvailabilityServices.setCheckpointRecoveryFactory(checkpointRecoveryFactory);
+		testingHighAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+		testingHighAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+
+		return testingHighAvailabilityServices;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) {
+		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setDispatcherLeaderRetriever(LeaderRetrievalService dispatcherLeaderRetriever) {
+		this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderRetriever(LeaderRetrievalService webMonitorEndpointLeaderRetriever) {
+		this.webMonitorEndpointLeaderRetriever = webMonitorEndpointLeaderRetriever;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setJobMasterLeaderRetrieverFunction(Function<JobID, LeaderRetrievalService> jobMasterLeaderRetrieverFunction) {
+		this.jobMasterLeaderRetrieverFunction = jobMasterLeaderRetrieverFunction;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setJobMasterLeaderElectionServiceFunction(Function<JobID, LeaderElectionService> jobMasterLeaderElectionServiceFunction) {
+		this.jobMasterLeaderElectionServiceFunction = jobMasterLeaderElectionServiceFunction;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setResourceManagerLeaderElectionService(LeaderElectionService resourceManagerLeaderElectionService) {
+		this.resourceManagerLeaderElectionService = resourceManagerLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setDispatcherLeaderElectionService(LeaderElectionService dispatcherLeaderElectionService) {
+		this.dispatcherLeaderElectionService = dispatcherLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setWebMonitorEndpointLeaderElectionService(LeaderElectionService webMonitorEndpointLeaderElectionService) {
+		this.webMonitorEndpointLeaderElectionService = webMonitorEndpointLeaderElectionService;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
+		this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) {
+		this.submittedJobGraphStore = submittedJobGraphStore;
+		return this;
+	}
+
+	public TestingHighAvailabilityServicesBuilder setRunningJobsRegistry(RunningJobsRegistry runningJobsRegistry) {
+		this.runningJobsRegistry = runningJobsRegistry;
+		return this;
+	}
+}