You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/13 13:26:30 UTC

[1/3] flink git commit: [hotfix][tests] Extract SubmittedJobGraphStore implementation from JobManagerHARecoveryTest

Repository: flink
Updated Branches:
  refs/heads/master 01d0d256d -> 7ddb674cb


[hotfix][tests] Extract SubmittedJobGraphStore implementation from JobManagerHARecoveryTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87749ca4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87749ca4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87749ca4

Branch: refs/heads/master
Commit: 87749ca40a8b40f609fb46957be2453797f75ed3
Parents: 01d0d25
Author: gyao <ga...@data-artisans.com>
Authored: Thu Nov 30 15:37:30 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 14:21:51 2017 +0100

----------------------------------------------------------------------
 .../jobmanager/JobManagerHARecoveryTest.java    | 53 ++----------
 .../InMemorySubmittedJobGraphStore.java         | 84 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87749ca4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f86e7e1..12bb95e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -74,6 +74,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
@@ -170,7 +171,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
-			MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
+			InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
+			submittedJobGraphStore.start(null);
 			CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore();
 			CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
 			CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
@@ -204,7 +206,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
 				myLeaderElectionService,
-				mySubmittedJobGraphStore,
+				submittedJobGraphStore,
 				checkpointStateFactory,
 				jobRecoveryTimeout,
 				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
@@ -286,7 +288,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			// check that the job gets removed from the JobManager
 			Await.ready(jobRemoved, deadline.timeLeft());
 			// but stays in the submitted job graph store
-			assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+			assertTrue(submittedJobGraphStore.contains(jobGraph.getJobID()));
 
 			Future<Object> jobRunning = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
 
@@ -306,7 +308,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			Await.ready(jobFinished, deadline.timeLeft());
 
 			// check that the job has been removed from the submitted job graph store
-			assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+			assertFalse(submittedJobGraphStore.contains(jobGraph.getJobID()));
 
 			// Check that state has been recovered
 			long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
@@ -482,49 +484,6 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		}
 	}
 
-	static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
-
-		Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
-
-		@Override
-		public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
-
-		}
-
-		@Override
-		public void stop() throws Exception {
-
-		}
-
-		@Override
-		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
-			if (storedJobs.containsKey(jobId)) {
-				return storedJobs.get(jobId);
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
-			storedJobs.put(jobGraph.getJobId(), jobGraph);
-		}
-
-		@Override
-		public void removeJobGraph(JobID jobId) throws Exception {
-			storedJobs.remove(jobId);
-		}
-
-		@Override
-		public Collection<JobID> getJobIds() throws Exception {
-			return storedJobs.keySet();
-		}
-
-		boolean contains(JobID jobId) {
-			return storedJobs.containsKey(jobId);
-		}
-	}
-
 	public static class BlockingInvokable extends AbstractInvokable {
 
 		private static boolean blocking = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/87749ca4/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
new file mode 100644
index 0000000..bf85771
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes.
+ */
+public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+	private final Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
+
+	private volatile boolean started;
+
+	@Override
+	public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
+		started = true;
+	}
+
+	@Override
+	public void stop() throws Exception {
+		started = false;
+	}
+
+	@Override
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		verifyIsStarted();
+		return storedJobs.getOrDefault(jobId, null);
+	}
+
+	@Override
+	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+		verifyIsStarted();
+		storedJobs.put(jobGraph.getJobId(), jobGraph);
+	}
+
+	@Override
+	public void removeJobGraph(JobID jobId) throws Exception {
+		verifyIsStarted();
+		storedJobs.remove(jobId);
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() throws Exception {
+		verifyIsStarted();
+		return storedJobs.keySet();
+	}
+
+	public boolean contains(JobID jobId) {
+		verifyIsStarted();
+		return storedJobs.containsKey(jobId);
+	}
+
+	private void verifyIsStarted() {
+		Preconditions.checkState(started, "Not running. Forgot to call start()?");
+	}
+
+}


[3/3] flink git commit: [FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

Posted by tr...@apache.org.
[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

Implement SubmittedJobGraphListener interface in Dispatcher

Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable
this, the dispatcher must implement the SubmittedJobGraphListener interface. Add
simple unit tests for the new methods. Refactor DispatcherTest to remove
redundancy.

[FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe

[FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService

[FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices

[FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest

Check if jobManagerRunner exists before submitting job.
Replace JobManagerRunner mock used in tests with real instance.
Do not run job graph recovery in actor main thread when job graph is recovered
from SubmittedJobGraphListener#onAddedJobGraph(JobID).

[FLINK-8176][flip6] Rename variables in DispatcherTest

[FLINK-8176][flip6] Remove injectMocks in DispatcherTest

[FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks

Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are
called. The checks in submitJob and removeJob are sufficient.

This closes #5107.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ddb674c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ddb674c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ddb674c

Branch: refs/heads/master
Commit: 7ddb674cb17c35f17aa073d3bfd6897d7fc13b9e
Parents: 8941f63
Author: gyao <ga...@data-artisans.com>
Authored: Thu Nov 30 15:44:23 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 14:21:52 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  46 +++-
 .../runtime/dispatcher/DispatcherTest.java      | 250 ++++++++++++-------
 .../TestingHighAvailabilityServices.java        |   4 +-
 .../TestingLeaderElectionService.java           |   9 +
 .../InMemorySubmittedJobGraphStore.java         |  26 +-
 5 files changed, 235 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 8a26f95..ea3a6ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -63,6 +64,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -73,7 +75,8 @@ import java.util.concurrent.CompletableFuture;
  * the jobs and to recover them in case of a master failure. Furthermore, it knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender {
+public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
+	DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
 
 	public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -173,6 +176,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	public void start() throws Exception {
 		super.start();
 
+		submittedJobGraphStore.start(this);
 		leaderElectionService.start(this);
 	}
 
@@ -197,7 +201,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
 		}
 
-		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING &&
+			!jobManagerRunners.containsKey(jobId)) {
 			try {
 				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
 			} catch (Exception e) {
@@ -248,7 +253,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
-		return CompletableFuture.completedFuture(jobManagerRunners.keySet());
+		return CompletableFuture.completedFuture(
+			Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
 	}
 
 	@Override
@@ -399,7 +405,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	/**
 	 * Recovers all jobs persisted via the submitted job graph store.
 	 */
-	private void recoverJobs() {
+	@VisibleForTesting
+	void recoverJobs() {
 		log.info("Recovering all persisted jobs.");
 
 		getRpcService().execute(
@@ -508,6 +515,37 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	//------------------------------------------------------
+	// SubmittedJobGraphListener
+	//------------------------------------------------------
+
+	@Override
+	public void onAddedJobGraph(final JobID jobId) {
+		getRpcService().execute(() -> {
+			final SubmittedJobGraph submittedJobGraph;
+			try {
+				submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+			} catch (final Exception e) {
+				log.error("Could not recover job graph for job {}.", jobId, e);
+				return;
+			}
+			runAsync(() -> {
+				submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
+			});
+		});
+	}
+
+	@Override
+	public void onRemovedJobGraph(final JobID jobId) {
+		runAsync(() -> {
+			try {
+				removeJob(jobId, false);
+			} catch (final Exception e) {
+				log.error("Could not remove job {}.", jobId, e);
+			}
+		});
+	}
+
+	//------------------------------------------------------
 	// Utility classes
 	//------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d5b63d4..8627c8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -20,57 +20,95 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Test for the {@link Dispatcher} component.
  */
 public class DispatcherTest extends TestLogger {
 
+	private static RpcService rpcService;
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Rule
 	public TestName name = new TestName();
 
-	private static RpcService rpcService;
-	private static final Time timeout = Time.seconds(10L);
+	private JobGraph jobGraph;
+
+	private TestingFatalErrorHandler fatalErrorHandler;
+
+	private SubmittedJobGraphStore submittedJobGraphStore;
+
+	private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+	private TestingLeaderElectionService jobMasterLeaderElectionService;
+
+	private RunningJobsRegistry runningJobsRegistry;
+
+	/** Instance under test. */
+	private TestingDispatcher dispatcher;
 
 	@BeforeClass
 	public static void setup() {
@@ -86,60 +124,77 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that we can submit a job to the Dispatcher which then spawns a
-	 * new JobManagerRunner.
-	 */
-	@Test
-	public void testJobSubmission() throws Exception {
-		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
-
-		TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
-		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
-		haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
+	@Before
+	public void setUp() throws Exception {
+		final JobVertex testVertex = new JobVertex("testVertex");
+		testVertex.setInvokableClass(NoOpInvokable.class);
+		jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
+		jobGraph.setAllowQueuedScheduling(true);
 
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
-		JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
+		fatalErrorHandler = new TestingFatalErrorHandler();
+		final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+		submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore());
 
-		final JobGraph jobGraph = mock(JobGraph.class);
-		final JobID jobId = new JobID();
-		when(jobGraph.getJobID()).thenReturn(jobId);
+		dispatcherLeaderElectionService = new TestingLeaderElectionService();
+		jobMasterLeaderElectionService = new TestingLeaderElectionService();
 
-		final TestingDispatcher dispatcher = new TestingDispatcher(
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+		haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
+		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+		haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService());
+		runningJobsRegistry = haServices.getRunningJobsRegistry();
+
+		final Configuration blobServerConfig = new Configuration();
+		blobServerConfig.setString(
+			BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		dispatcher = new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
 			new Configuration(),
 			haServices,
 			mock(ResourceManagerGateway.class),
-			mock(BlobServer.class),
+			new BlobServer(blobServerConfig, new VoidBlobStore()),
 			heartbeatServices,
-			mock(MetricRegistryImpl.class),
+			new NoOpMetricRegistry(),
 			fatalErrorHandler,
-			jobManagerRunner,
-			jobId);
+			TEST_JOB_ID);
 
-		try {
-			dispatcher.start();
+		dispatcher.start();
+	}
 
-			CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+	@After
+	public void tearDown() throws Exception {
+		try {
+			fatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+		}
+	}
 
-			// wait for the leader to be elected
-			leaderFuture.get();
+	/**
+	 * Tests that we can submit a job to the Dispatcher which then spawns a
+	 * new JobManagerRunner.
+	 */
+	@Test
+	public void testJobSubmission() throws Exception {
+		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		// wait for the leader to be elected
+		leaderFuture.get();
 
-			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			acknowledgeFuture.get();
+		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
 
-			verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
+		acknowledgeFuture.get();
 
-			// check that no error has occurred
-			fatalErrorHandler.rethrowError();
-		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-		}
+		assertTrue(
+			"jobManagerRunner was not started",
+			dispatcherLeaderElectionService.isStarted());
 	}
 
 	/**
@@ -147,61 +202,63 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderElection() throws Exception {
-		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
-		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-
 		UUID expectedLeaderSessionId = UUID.randomUUID();
-		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
-		SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
-		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() {
-			@Override
-			public void confirmLeaderSessionID(UUID leaderSessionId) {
-				super.confirmLeaderSessionID(leaderSessionId);
-				leaderSessionIdFuture.complete(leaderSessionId);
-			}
-		};
 
-		haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-		haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-		final JobID jobId = new JobID();
+		assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
 
-		final TestingDispatcher dispatcher = new TestingDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-			new Configuration(),
-			haServices,
-			mock(ResourceManagerGateway.class),
-			mock(BlobServer.class),
-			heartbeatServices,
-			mock(MetricRegistryImpl.class),
-			fatalErrorHandler,
-			mock(JobManagerRunner.class),
-			jobId);
+		dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
 
-		try {
-			dispatcher.start();
+		UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
+			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+		verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds();
+	}
+
+	/**
+	 * Test callbacks from
+	 * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
+	 */
+	@Test
+	public void testSubmittedJobGraphListener() throws Exception {
+		dispatcher.recoverJobsEnabled.set(false);
 
-			assertFalse(leaderSessionIdFuture.isDone());
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
-			testingLeaderElectionService.isLeader(expectedLeaderSessionId);
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
-			assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+		final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID);
 
-			verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
-		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-		}
+		// pretend that other Dispatcher has removed job from submittedJobGraphStore
+		submittedJobGraphStore.removeJobGraph(TEST_JOB_ID);
+		dispatcher.onRemovedJobGraph(TEST_JOB_ID);
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), empty());
+
+		// pretend that other Dispatcher has added a job to submittedJobGraphStore
+		runningJobsRegistry.clearJob(TEST_JOB_ID);
+		submittedJobGraphStore.putJobGraph(submittedJobGraph);
+		dispatcher.onAddedJobGraph(TEST_JOB_ID);
+		dispatcher.submitJobLatch.await();
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1));
 	}
 
 	private static class TestingDispatcher extends Dispatcher {
 
-		private final JobManagerRunner jobManagerRunner;
 		private final JobID expectedJobId;
 
-		protected TestingDispatcher(
+		private final CountDownLatch submitJobLatch = new CountDownLatch(2);
+
+		/**
+		 * Controls whether existing jobs in {@link SubmittedJobGraphStore} should be recovered
+		 * when {@link TestingDispatcher} is granted leadership.
+		 * */
+		private final AtomicBoolean recoverJobsEnabled = new AtomicBoolean(true);
+
+		private TestingDispatcher(
 				RpcService rpcService,
 				String endpointId,
 				Configuration configuration,
@@ -211,7 +268,6 @@ public class DispatcherTest extends TestLogger {
 				HeartbeatServices heartbeatServices,
 				MetricRegistry metricRegistry,
 				FatalErrorHandler fatalErrorHandler,
-				JobManagerRunner jobManagerRunner,
 				JobID expectedJobId) throws Exception {
 			super(
 				rpcService,
@@ -225,7 +281,6 @@ public class DispatcherTest extends TestLogger {
 				fatalErrorHandler,
 				null);
 
-			this.jobManagerRunner = jobManagerRunner;
 			this.expectedJobId = expectedJobId;
 		}
 
@@ -243,7 +298,32 @@ public class DispatcherTest extends TestLogger {
 				FatalErrorHandler fatalErrorHandler) throws Exception {
 			assertEquals(expectedJobId, jobGraph.getJobID());
 
-			return jobManagerRunner;
+			return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService,
+				highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry,
+				onCompleteActions, fatalErrorHandler, null);
+		}
+
+		@Override
+		public CompletableFuture<Acknowledge> submitJob(final JobGraph jobGraph, final Time timeout) {
+			final CompletableFuture<Acknowledge> submitJobFuture = super.submitJob(jobGraph, timeout);
+
+			try {
+				submitJobFuture.get();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			submitJobLatch.countDown();
+			return submitJobFuture;
+		}
+
+		@Override
+		void recoverJobs() {
+			if (recoverJobsEnabled.get()) {
+				super.recoverJobs();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index dba7bef..db0b88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -52,6 +52,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile SubmittedJobGraphStore submittedJobGraphStore;
 
+	private final RunningJobsRegistry runningJobsRegistry = new StandaloneRunningJobsRegistry();
+
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
 	// ------------------------------------------------------------------------
@@ -185,7 +187,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	@Override
 	public RunningJobsRegistry getRunningJobsRegistry() {
-		return new StandaloneRunningJobsRegistry();
+		return runningJobsRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index d951db5..4ecb9b61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -86,4 +86,13 @@ public class TestingLeaderElectionService implements LeaderElectionService {
 	public synchronized String getAddress() {
 		return contender.getAddress();
 	}
+
+	/**
+	 * Returns <code>true</code> if {@link #start(LeaderContender)} was called,
+	 * <code>false</code> otherwise.
+	 */
+	public synchronized boolean isStarted() {
+		return contender != null;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ddb674c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index bf85771..ee208ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -26,9 +26,13 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes.
  */
@@ -36,43 +40,45 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 	private final Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
 
-	private volatile boolean started;
+	private boolean started;
 
 	@Override
-	public void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
+	public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception {
 		started = true;
 	}
 
 	@Override
-	public void stop() throws Exception {
+	public synchronized void stop() throws Exception {
 		started = false;
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 		verifyIsStarted();
-		return storedJobs.getOrDefault(jobId, null);
+		return requireNonNull(
+			storedJobs.get(jobId),
+			"Job graph for job " + jobId + " does not exist");
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public synchronized void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
 		verifyIsStarted();
 		storedJobs.put(jobGraph.getJobId(), jobGraph);
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public synchronized void removeJobGraph(JobID jobId) throws Exception {
 		verifyIsStarted();
 		storedJobs.remove(jobId);
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public synchronized Collection<JobID> getJobIds() throws Exception {
 		verifyIsStarted();
-		return storedJobs.keySet();
+		return Collections.unmodifiableSet(new HashSet<>(storedJobs.keySet()));
 	}
 
-	public boolean contains(JobID jobId) {
+	public synchronized boolean contains(JobID jobId) {
 		verifyIsStarted();
 		return storedJobs.containsKey(jobId);
 	}


[2/3] flink git commit: [hotfix][Javadoc] Make first sentence in JobSubmissionException Javadoc end with period

Posted by tr...@apache.org.
[hotfix][Javadoc] Make first sentence in JobSubmissionException Javadoc end with period


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8941f636
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8941f636
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8941f636

Branch: refs/heads/master
Commit: 8941f636ba1550b1d934278ef0c13e6d9a354781
Parents: 87749ca
Author: gyao <ga...@data-artisans.com>
Authored: Mon Dec 4 19:58:26 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 14:21:52 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/client/JobSubmissionException.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8941f636/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
index 3cb0b9f..afed69a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.client;
 import org.apache.flink.api.common.JobID;
 
 /**
- * This exception denotes an error while submitting a job to the JobManager
+ * This exception denotes an error while submitting a job to the JobManager.
  */
 public class JobSubmissionException extends JobExecutionException {