You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/16 21:38:42 UTC

[1/5] flink git commit: [hotfix] [tests] Migrate some test tasks to Java

Repository: flink
Updated Branches:
  refs/heads/flip-6 2486d3787 -> a19cae3b0


[hotfix] [tests] Migrate some test tasks to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7

Branch: refs/heads/flip-6
Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7
Parents: 2486d37
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:54:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:01:32 2016 +0200

----------------------------------------------------------------------
 .../StackTraceSampleCoordinatorITCase.java      |  8 ++--
 .../checkpoint/CoordinatorShutdownTest.java     |  8 ++--
 .../ExecutionGraphMetricsTest.java              |  4 +-
 .../ExecutionGraphRestartTest.java              | 23 +++++------
 .../runtime/jobmanager/JobManagerTest.java      |  9 +++--
 .../flink/runtime/jobmanager/JobSubmitTest.java |  9 ++---
 .../runtime/taskmanager/TaskManagerTest.java    | 41 ++++++++++----------
 .../testtasks/BlockingNoOpInvokable.java        | 39 +++++++++++++++++++
 .../flink/runtime/testtasks/NoOpInvokable.java  | 30 ++++++++++++++
 .../runtime/testtasks/WaitingNoOpInvokable.java | 34 ++++++++++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  3 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  1 +
 .../apache/flink/runtime/jobmanager/Tasks.scala | 20 ----------
 .../JobSubmissionFailsITCase.java               |  6 +--
 .../JobManagerHACheckpointRecoveryITCase.java   |  4 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  4 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  2 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  3 +-
 18 files changed, 170 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..d74af08 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			final int parallelism = 1;
 
 			final JobVertex task = new JobVertex("Task");
-			task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+			task.setInvokableClass(BlockingNoOpInvokable.class);
 			task.setParallelism(parallelism);
 
 			jobGraph.addVertex(task);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..a346a80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -58,7 +60,7 @@ public class CoordinatorShutdownTest {
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			vertex.setInvokableClass(NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
@@ -110,7 +112,7 @@ public class CoordinatorShutdownTest {
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			vertex.setInvokableClass(NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 
 			JobGraph testGraph = new JobGraph("test job", vertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 70c2bf9..1f8845d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -41,13 +41,13 @@ import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -89,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 	
 			JobVertex jobVertex = new JobVertex("TestVertex");
 			jobVertex.setParallelism(parallelism);
-			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			jobVertex.setInvokableClass(NoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
 	
 			Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0d09e38..9fbda51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,15 +36,16 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -102,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class);
-		JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class);
+		JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, NoOpInvokable.class);
+		JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, NoOpInvokable.class);
 
 		SlotSharingGroup sharingGroup = new SlotSharingGroup();
 		groupVertex.setSlotSharingGroup(sharingGroup);
@@ -237,7 +238,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
 
 		JobVertex jobVertex = new JobVertex("NoOpInvokable");
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
 		jobVertex.setParallelism(NUM_TASKS);
 
 		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
@@ -369,8 +370,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
-		JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
+		JobVertex sender = newJobVertex("Task1", 1, NoOpInvokable.class);
+		JobVertex receiver = newJobVertex("Task2", 1, NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
 		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -432,7 +433,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+		JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -477,7 +478,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+		JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -525,7 +526,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		scheduler.newInstanceAvailable(instance);
 
 		JobVertex sender = new JobVertex("Task");
-		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setInvokableClass(NoOpInvokable.class);
 		sender.setParallelism(NUM_TASKS);
 
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -639,7 +640,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex sender = newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class);
+		JobVertex sender = newJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
 
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
@@ -656,7 +657,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		return new Tuple2<>(eg, instance);
 	}
 
-	private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) {
+	private static JobVertex newJobVertex(String task1, int numTasks, Class<NoOpInvokable> invokable) {
 		JobVertex groupVertex = new JobVertex(task1);
 		groupVertex.setInvokableClass(invokable);
 		groupVertex.setParallelism(numTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3019b06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -169,7 +170,7 @@ public class JobManagerTest {
 						// Create a task
 						final JobVertex sender = new JobVertex("Sender");
 						sender.setParallelism(1);
-						sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
 						sender.createAndAddResultDataSet(rid, PIPELINED);
 
 						final JobGraph jobGraph = new JobGraph("Blocking test job", sender);
@@ -340,7 +341,7 @@ public class JobManagerTest {
 						// Create a task
 						final JobVertex sender = new JobVertex("Sender");
 						sender.setParallelism(1);
-						sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
 
 						final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender);
 						final JobID jid = jobGraph.getJobID();
@@ -442,11 +443,11 @@ public class JobManagerTest {
 		JobGraph jobGraph = new JobGraph("croissant");
 		JobVertex jobVertex1 = new JobVertex("cappuccino");
 		jobVertex1.setParallelism(4);
-		jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+		jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
 
 		JobVertex jobVertex2 = new JobVertex("americano");
 		jobVertex2.setParallelism(4);
-		jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+		jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
 
 		jobGraph.addVertex(jobVertex1);
 		jobGraph.addVertex(jobVertex2);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 959b9a7..f793524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,12 +27,12 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -108,7 +107,7 @@ public class JobSubmitTest {
 		try {
 			// create a simple job graph
 			JobVertex jobVertex = new JobVertex("Test Vertex");
-			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			jobVertex.setInvokableClass(NoOpInvokable.class);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
 			// request the blob port from the job manager
@@ -172,7 +171,7 @@ public class JobSubmitTest {
 				}
 			};
 
-			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			jobVertex.setInvokableClass(NoOpInvokable.class);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
 			// submit the job

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d1909fe..22a68b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
@@ -1099,25 +1100,25 @@ public class TaskManagerTest extends TestLogger {
 
 				// Single blocking task
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-					new JobID(),
-					new AllocationID(),
-					"Job",
-					new JobVertexID(),
-					new ExecutionAttemptID(),
-					new SerializedValue<>(new ExecutionConfig()),
-					"Task",
-					1,
-					0,
-					1,
-					0,
-					new Configuration(),
-					new Configuration(),
-					Tasks.BlockingNoOpInvokable.class.getName(),
-					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-					Collections.<InputGateDeploymentDescriptor>emptyList(),
-					Collections.<BlobKey>emptyList(),
-					Collections.<URL>emptyList(),
-					0);
+						new JobID(),
+						new AllocationID(),
+						"Job",
+						new JobVertexID(),
+						new ExecutionAttemptID(),
+						new SerializedValue<>(new ExecutionConfig()),
+						"Task",
+						1,
+						0,
+						1,
+						0,
+						new Configuration(),
+						new Configuration(),
+						BlockingNoOpInvokable.class.getName(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(),
+						0);
 
 				// Submit the task
 				new Within(d) {
@@ -1220,7 +1221,7 @@ public class TaskManagerTest extends TestLogger {
 									// Look for BlockingNoOpInvokable#invoke
 									for (StackTraceElement elem : trace) {
 										if (elem.getClassName().equals(
-												Tasks.BlockingNoOpInvokable.class.getName())) {
+												BlockingNoOpInvokable.class.getName())) {
 
 											assertEquals("invoke", elem.getMethodName());
 											success = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c9adba8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A task that does nothing but blocks indefinitely, until the executing thread is interrupted.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+	@Override
+	public void invoke() throws Exception {
+		final Object o = new Object();
+		//noinspection SynchronizationOnLocalVariableOrMethodParameter
+		synchronized (o) {
+			//noinspection InfiniteLoopStatement
+			while (true) {
+				o.wait();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
new file mode 100644
index 0000000..fa9949a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes immediately.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+	@Override
+	public void invoke() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
new file mode 100644
index 0000000..de7d59a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes after a short delay of 100 milliseconds.
+ */
+public class WaitingNoOpInvokable extends AbstractInvokable {
+
+	private static final long waitingTime = 100L;
+
+	@Override
+	public void invoke() throws Exception {
+		Thread.sleep(waitingTime);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index c30d244..ff0286d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testtasks.NoOpInvokable
 import org.apache.flink.util.SerializedValue
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -48,7 +49,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
         scheduler.newInstanceAvailable(instance2)
 
         val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+        sender.setInvokableClass(classOf[NoOpInvokable])
         sender.setParallelism(20)
 
         val jobGraph = new JobGraph("Pointwise job", sender)

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0569297..4aa0565 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableExcepti
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.testtasks._
 import org.junit.runner.RunWith
 import org.mockito.Mockito
 import org.mockito.Mockito._

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 87c123a..fabd66b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -25,26 +25,6 @@ import org.apache.flink.types.IntValue
 
 
 object Tasks {
-  class BlockingNoOpInvokable extends AbstractInvokable {
-    override def invoke(): Unit = {
-      val o = new Object()
-      o.synchronized{
-        o.wait()
-      }
-    }
-  }
-
-  class NoOpInvokable extends AbstractInvokable{
-    override def invoke(): Unit = {}
-  }
-
-  class WaitingNoOpInvokable extends AbstractInvokable{
-    val waitingTime = 100L
-
-    override def invoke(): Unit = {
-      Thread.sleep(waitingTime)
-    }
-  }
 
   class Sender extends AbstractInvokable{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 178656d..256b1ae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -63,7 +63,7 @@ public class JobSubmissionFailsITCase {
 			cluster.start();
 			
 			final JobVertex jobVertex = new JobVertex("Working job vertex.");
-			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			jobVertex.setInvokableClass(NoOpInvokable.class);
 			workingJobGraph = new JobGraph("Working testing job", jobVertex);
 		}
 		catch (Exception e) {
@@ -113,7 +113,7 @@ public class JobSubmissionFailsITCase {
 	public void testExceptionInInitializeOnMaster() {
 		try {
 			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
-			failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
 			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 49eaeb7..92b90da 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -372,7 +372,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// BLocking JobGraph
 			JobVertex blockingVertex = new JobVertex("Blocking vertex");
-			blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+			blockingVertex.setInvokableClass(BlockingNoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph(blockingVertex);
 
 			// Submit the job in detached mode

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bf39c4b..236e922 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -461,7 +461,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Blocking program");
 
 		JobVertex jobVertex = new JobVertex("Blocking Vertex");
-		jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
 
 		jobGraph.addVertex(jobVertex);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1b2838d..258282e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -23,7 +23,7 @@ import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 3b39b3f..e141cc2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, BlockingReceiver, NoOpInvokable, Sender}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._


[2/5] flink git commit: [FLINK-4835] [cluster management] Add embedded version of the high-availability services

Posted by se...@apache.org.
[FLINK-4835] [cluster management] Add embedded version of the high-availability services

This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79476624
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79476624
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79476624

Branch: refs/heads/flip-6
Commit: 7947662467d102edc675fde19948b7638a044343
Parents: da16b0a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:09:39 2016 +0200

----------------------------------------------------------------------
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../highavailability/EmbeddedNonHaServices.java |  62 +++
 .../HighAvailabilityServices.java               |   7 +-
 .../runtime/highavailability/NonHaServices.java |  62 +--
 .../highavailability/ZookeeperHaServices.java   |  12 +-
 .../nonha/AbstractNonHaServices.java            | 175 +++++++
 .../nonha/EmbeddedLeaderService.java            | 466 +++++++++++++++++++
 .../TestingHighAvailabilityServices.java        |   9 +
 8 files changed, 736 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
 	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
 			throws Exception {
 
-		return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-				.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+		return new StandaloneCompletedCheckpointStore(
+				CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 0000000..58da287
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
+
+	private final EmbeddedLeaderService resourceManagerLeaderService;
+
+	public EmbeddedNonHaServices() {
+		super();
+		this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService());
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+		return resourceManagerLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		return resourceManagerLeaderService.createLeaderElectionService();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		super.shutdown();
+		resourceManagerLeaderService.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 484cddb..f6db682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,11 +49,10 @@ public interface HighAvailabilityServices {
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
-	 * @param defaultAddress address under which the job manager is reachable
 	 * @return
 	 * @throws Exception
 	 */
-	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
@@ -86,4 +85,8 @@ public interface HighAvailabilityServices {
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
 	 */
 	BlobStore createBlobStore() throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 1c73c01..107cbd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,21 +18,13 @@
 
 package org.apache.flink.runtime.highavailability;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,35 +33,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This implementation can be used for testing, and for cluster setups that do not
  * tolerate failures of the master processes (JobManager, ResourceManager).
  * 
- * <p>This implementation has no dependencies on any external services. It returns fix
- * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
- * in volatile memory.
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
  */
-public class NonHaServices implements HighAvailabilityServices {
+public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
 
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
-	private final ConcurrentHashMap<JobID, String> jobMastersAddress;
-
 	/**
 	 * Creates a new services class for the fix pre-defined leaders.
 	 * 
 	 * @param resourceManagerAddress    The fix address of the ResourceManager
 	 */
 	public NonHaServices(String resourceManagerAddress) {
+		super();
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
-		this.jobMastersAddress = new ConcurrentHashMap<>(16);
-	}
-
-	/**
-	 * Binds address of a specified job master
-	 *
-	 * @param jobID            JobID for the specified job master
-	 * @param jobMasterAddress address for the specified job master
-	 */
-	public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
-		jobMastersAddress.put(jobID, jobMasterAddress);
 	}
 
 	// ------------------------------------------------------------------------
@@ -82,37 +62,7 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
-		return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0));
-	}
-
-	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
 		return new StandaloneLeaderElectionService();
 	}
-
-	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
-		return new StandaloneLeaderElectionService();
-	}
-
-	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
-		return new StandaloneCheckpointRecoveryFactory();
-	}
-
-	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return new StandaloneSubmittedJobGraphStore();
-	}
-
-	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
-		return new NonHaRegistry();
-	}
-
-	@Override
-	public BlobStore createBlobStore() {
-		return new VoidBlobStore();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index bbe8ecb..eae45ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -85,7 +85,8 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
 
 	// ------------------------------------------------------------------------
-
+	
+	
 	/** The ZooKeeper client to use */
 	private final CuratorFramework client;
 
@@ -168,6 +169,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		client.close();
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
new file mode 100644
index 0000000..8c15a52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed
+ * by storage that has no availability guarantees and leader election services that cannot
+ * elect among multiple distributed leader contenders.
+ */
+public abstract class AbstractNonHaServices implements HighAvailabilityServices {
+
+	private final Object lock = new Object();
+
+	private final ExecutorService executor;
+
+	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	private boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	public AbstractNonHaServices() {
+		this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory());
+		this.jobManagerLeaderServices = new HashMap<>();
+		this.runningJobsRegistry = new NonHaRegistry();
+	}
+
+	// ------------------------------------------------------------------------
+	//  services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderRetrievalService();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderElectionService();
+		}
+	}
+
+	@GuardedBy("lock")
+	private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+		EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+		if (service == null) {
+			service = new EmbeddedLeaderService(executor);
+			jobManagerLeaderServices.put(jobID, service);
+		}
+		return service;
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+		checkNotShutdown();
+		return new StandaloneCheckpointRecoveryFactory();
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		checkNotShutdown();
+		return new StandaloneSubmittedJobGraphStore();
+	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		checkNotShutdown();
+		return runningJobsRegistry;
+	}
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		checkNotShutdown();
+		return new VoidBlobStore();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		synchronized (lock) {
+			if (!shutdown) {
+				shutdown = true;
+
+				// no further calls should be dispatched
+				executor.shutdownNow();
+
+				// stop all job manager leader services
+				for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+					service.shutdown();
+				}
+				jobManagerLeaderServices.clear();
+			}
+		}
+	}
+
+	private void checkNotShutdown() {
+		checkState(!shutdown, "high availability services are shut down");
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	protected ExecutorService getExecutorService() {
+		return executor;
+	}
+
+	private static final class ServicesThreadFactory implements ThreadFactory {
+
+		private AtomicInteger enumerator = new AtomicInteger();
+
+		@Override
+		public Thread newThread(@Nonnull Runnable r) {
+			Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+			thread.setDaemon(true);
+			return thread;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
new file mode 100644
index 0000000..84ac551
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ * 
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+	private final Object lock = new Object();
+
+	private final Executor notificationExecutor;
+
+	private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+	private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+	/** proposed leader, which has been notified of leadership grant, but has not confirmed */
+	private EmbeddedLeaderElectionService currentLeaderProposed;
+
+	/** actual leader that has confirmed leadership and of which listeners have been notified */
+	private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+	/** fencing UID for the current leader (or proposed leader) */
+	private UUID currentLeaderSessionId;
+
+	/** the cached address of the current leader */
+	private String currentLeaderAddress;
+
+	/** flag marking the service as terminated */
+	private boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.allLeaderContenders = new HashSet<>();
+		this.listeners = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown and errors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down this leader election service.
+	 * 
+	 * <p>This method does not perform a clean revocation of the leader status and
+	 * no notification to any leader listeners. It simply notifies all contenders
+	 * and listeners that the service is no longer available.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down"));
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+		}
+	}
+
+	@GuardedBy("lock")
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		assert Thread.holdsLock(lock);
+
+		if (!shutdown) {
+			// clear all leader status
+			currentLeaderProposed = null;
+			currentLeaderConfirmed = null;
+			currentLeaderSessionId = null;
+			currentLeaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+				service.shutdown(exceptionForHandlers);
+			}
+			allLeaderContenders.clear();
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+
+			shutdown = true;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  creating contenders and listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderElectionService createLeaderElectionService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderElectionService();
+	}
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	// ------------------------------------------------------------------------
+	//  adding and removing contenders & listeners
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Callback from leader contenders when they start their service.
+	 */
+	void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader election service is already started");
+
+			try {
+				if (!allLeaderContenders.add(service)) {
+					throw new IllegalStateException("leader election service was added to this service multiple times");
+				}
+
+				service.contender = contender;
+				service.running = true;
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they stop their service.
+	 */
+	void removeContender(EmbeddedLeaderElectionService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!allLeaderContenders.remove(service)) {
+					throw new IllegalStateException("leader election service does not belong to this service");
+				}
+
+				// stop the service
+				service.contender = null;
+				service.running = false;
+				service.isLeader = false;
+
+				// if that was the current leader, unset its status
+				if (currentLeaderConfirmed == service) {
+					currentLeaderConfirmed = null;
+					currentLeaderSessionId = null;
+					currentLeaderAddress = null;
+				}
+				if (currentLeaderProposed == service) {
+					currentLeaderProposed = null;
+					currentLeaderSessionId = null;
+				}
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they confirm a leader grant
+	 */
+	void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+		synchronized (lock) {
+			// if the service was shut down in the meantime, ignore this confirmation
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				// check if the confirmation is for the same grant, or whether it is a stale grant 
+				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+					final String address = service.contender.getAddress();
+					LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+
+					// mark leadership
+					currentLeaderConfirmed = service;
+					currentLeaderAddress = address;
+					currentLeaderProposed = null;
+					service.isLeader = true;
+
+					// notify all listeners
+					for (EmbeddedLeaderRetrievalService listener : listeners) {
+						notificationExecutor.execute(
+								new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+					}
+				}
+				else {
+					LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void updateLeader() {
+		// this must be called under the lock
+		assert Thread.holdsLock(lock);
+
+		if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+			// we need a new leader
+			if (allLeaderContenders.isEmpty()) {
+				// no new leader available, tell everyone that there is no leader currently
+				for (EmbeddedLeaderRetrievalService listener : listeners) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+				}
+			}
+			else {
+				// propose a leader and ask it
+				final UUID leaderSessionId = UUID.randomUUID();
+				EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+				currentLeaderSessionId = leaderSessionId;
+				currentLeaderProposed = leaderService;
+
+				notificationExecutor.execute(
+						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+			}
+		}
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (currentLeaderConfirmed != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  election and retrieval service implementations 
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+		volatile LeaderContender contender;
+
+		volatile boolean isLeader;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderContender contender) throws Exception {
+			checkNotNull(contender);
+			addContender(this, contender);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeContender(this);
+		}
+
+		@Override
+		public void confirmLeaderSessionID(UUID leaderSessionID) {
+			checkNotNull(leaderSessionID);
+			confirmLeader(this, leaderSessionID);
+		}
+
+		@Override
+		public boolean hasLeadership() {
+			return isLeader;
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				isLeader = false;
+				contender.handleError(cause);
+				contender = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		public void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				listener.handleError(cause);
+				listener = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+		private final Logger logger;
+
+		GrantLeadershipCall(
+				LeaderContender contender,
+				UUID leaderSessionId,
+				Logger logger) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 38e372d..3e88e8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public BlobStore createBlobStore() throws IOException {
 		return new VoidBlobStore();
 	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		// nothing to do, since this should not shut down individual services, but cross service parts
+	}
 }


[3/5] flink git commit: [FLINK-4836] [cluster management] Add flink mini cluster (part 1)

Posted by se...@apache.org.
[FLINK-4836] [cluster management] Add flink mini cluster (part 1)

This implements
  - mini cluster configuration
  - startup / shutdown of common services (rpc, ha)
  - startup / shutdown of JobManager and Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655722a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655722a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655722a2

Branch: refs/heads/flip-6
Commit: 655722a29edcca261653f54fa07fb7b4c94602a6
Parents: 7947662
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:36 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  62 ++-
 .../HighAvailabilityServicesUtils.java          |  17 +
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   1 -
 .../jobmaster/MiniClusterJobDispatcher.java     | 394 -----------------
 .../flink/runtime/minicluster/MiniCluster.java  | 406 ++++++++++++++++++
 .../minicluster/MiniClusterConfiguration.java   | 147 +++++++
 .../minicluster/MiniClusterJobDispatcher.java   | 418 +++++++++++++++++++
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../TestingHighAvailabilityServices.java        |   2 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  79 ++++
 12 files changed, 1126 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 0f6f24f..d34b236 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class ExceptionUtils {
 
@@ -58,7 +61,54 @@ public final class ExceptionUtils {
 			return e.getClass().getName() + " (error while printing stack trace)";
 		}
 	}
-	
+
+	/**
+	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
+	 * to a prior exception, or returns the new exception, if no prior exception exists.
+	 * 
+	 * <pre>{@code
+	 * 
+	 * public void closeAllThings() throws Exception {
+	 *     Exception ex = null;
+	 *     try {
+	 *         component.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         anotherComponent.stop();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         lastComponent.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 * 
+	 *     if (ex != null) {
+	 *         throw ex;
+	 *     }
+	 * }
+	 * }</pre>
+	 * 
+	 * @param newException The newly occurred exception
+	 * @param previous     The previously occurred exception, possibly null.
+	 * 
+	 * @return The new exception, if no previous exception exists, or the previous exception with the
+	 *         new exception in the list of suppressed exceptions.
+	 */
+	public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
+		checkNotNull(newException, "newException");
+
+		if (previous == null) {
+			return newException;
+		} else {
+			previous.addSuppressed(newException);
+			return previous;
+		}
+	}
+
 	/**
 	 * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
 	 * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions
@@ -161,10 +211,8 @@ public final class ExceptionUtils {
 		}
 	}
 
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private ExceptionUtils() {
-		throw new RuntimeException();
-	}
+	// ------------------------------------------------------------------------
+
+	/** Private constructor to prevent instantiation. */
+	private ExceptionUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f3da847..9113309 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 public class HighAvailabilityServicesUtils {
 
+	public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
+
+		switch (highAvailabilityMode) {
+			case NONE:
+				return new EmbeddedNonHaServices();
+
+			case ZOOKEEPER:
+				throw new UnsupportedOperationException("ZooKeeper high availability services " +
+						"have not been implemented yet.");
+
+			default:
+				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+		}
+	}
+	
+	
 	public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index eae45ab..a9d2610 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -112,7 +112,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 74ca6f3..3313d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	@Override
 	public void jobFinishedByOther() {
 		try {
-			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
deleted file mode 100644
index 019ccfe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** lock to ensure that this dispatcher executes only one job at a time */
-	private final Object lock = new Object();
-
-	/** the configuration with which the mini cluster was started */
-	private final Configuration configuration;
-
-	/** the RPC service to use by the job managers */
-	private final RpcService rpcService;
-
-	/** services for discovery, leader election, and recovery */
-	private final HighAvailabilityServices haServices;
-
-	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
-	private final JobManagerServices jobManagerServices;
-
-	/** Registry for all metrics in the mini cluster */
-	private final MetricRegistry metricRegistry;
-
-	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
-	private final int numJobManagers;
-
-	/** The runner for the job and master. non-null if a job is currently running */
-	private volatile JobManagerRunner[] runners;
-
-	/** flag marking the dispatcher as hut down */
-	private volatile boolean shutdown;
-
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 * 
-	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
-	 * non-highly-available setup.
-	 * 
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 * 
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			RpcService rpcService,
-			HighAvailabilityServices haServices,
-			MetricRegistry metricRegistry) throws Exception {
-		this(config, rpcService, haServices, metricRegistry, 1);
-	}
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 *
-	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
-	 * a highly-available setup.
-	 * 
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 * @param numJobManagers The number of JobMasters to start for each job.
-	 * 
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			RpcService rpcService,
-			HighAvailabilityServices haServices,
-			MetricRegistry metricRegistry,
-			int numJobManagers) throws Exception {
-
-		checkArgument(numJobManagers >= 1);
-		this.configuration = checkNotNull(config);
-		this.rpcService = checkNotNull(rpcService);
-		this.haServices = checkNotNull(haServices);
-		this.metricRegistry = checkNotNull(metricRegistry);
-		this.numJobManagers = numJobManagers;
-
-		LOG.info("Creating JobMaster services");
-		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
-	 * terminally failed.
-	 */
-	public void shutdown() {
-		synchronized (lock) {
-			if (!shutdown) {
-				shutdown = true;
-
-				LOG.info("Shutting down the dispatcher");
-
-				// in this shutdown code we copy the references to the stack first,
-				// to avoid concurrent modification
-
-				JobManagerRunner[] runners = this.runners;
-				if (runners != null) {
-					this.runners = null;
-
-					for (JobManagerRunner runner : runners) {
-						runner.shutdown();
-					}
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  submitting jobs
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method executes a job in detached mode. The method returns immediately after the job
-	 * has been added to the
-	 *
-	 * @param job  The Flink job to execute
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
-	 *         or if the job terminally failed.
-	 */
-	public void runDetached(JobGraph job) throws JobExecutionException {
-		checkNotNull(job);
-
-		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
-
-			this.runners = startJobRunners(job, finalizer, finalizer);
-		}
-	}
-
-	/**
-	 * This method runs a job in blocking mode. The method returns only after the job
-	 * completed successfully, or after it failed terminally.
-	 *
-	 * @param job  The Flink job to execute 
-	 * @return The result of the job execution
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
-	 *         or if the job terminally failed.
-	 */
-	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
-		checkNotNull(job);
-		
-		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
-		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			this.runners = startJobRunners(job, sync, sync);
-		}
-
-		try {
-			return sync.getResult();
-		}
-		finally {
-			// always clear the status for the next job
-			runners = null;
-		}
-	}
-
-	private JobManagerRunner[] startJobRunners(
-			JobGraph job,
-			OnCompletionActions onCompletion,
-			FatalErrorHandler errorHandler) throws JobExecutionException {
-		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
-
-		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
-		for (int i = 0; i < numJobManagers; i++) {
-			try {
-				runners[i] = new JobManagerRunner(job, configuration,
-						rpcService, haServices, jobManagerServices, metricRegistry, 
-						onCompletion, errorHandler);
-				runners[i].start();
-			}
-			catch (Throwable t) {
-				// shut down all the ones so far
-				for (int k = 0; k <= i; k++) {
-					try {
-						if (runners[i] != null) {
-							runners[i].shutdown();
-						}
-					} catch (Throwable ignored) {
-						// silent shutdown
-					}
-				}
-
-				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
-			}
-		}
-
-		return runners;
-	}
-
-	// ------------------------------------------------------------------------
-	//  test methods to simulate job master failures
-	// ------------------------------------------------------------------------
-
-//	public void killJobMaster(int which) {
-//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-//		checkState(!shutdown, "mini cluster is shut down");
-//
-//		JobManagerRunner[] runners = this.runners;
-//		checkState(runners != null, "mini cluster it not executing a job right now");
-//
-//		runners[which].shutdown(new Throwable("kill JobManager"));
-//	}
-
-	// ------------------------------------------------------------------------
-	//  utility classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple class that waits for all runners to have reported that they are done.
-	 * In the case of a high-availability test setup, there may be multiple runners.
-	 * After that, it marks the mini cluster as ready to receive new jobs.
-	 */
-	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
-
-		private final AtomicInteger numJobManagersToWaitFor;
-
-		private DetachedFinalizer(int numJobManagersToWaitFor) {
-			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
-		}
-
-		@Override
-		public void jobFinished(JobExecutionResult result) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFailed(Throwable cause) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			decrementCheckAndCleanup();
-		}
-
-		private void decrementCheckAndCleanup() {
-			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
-				MiniClusterJobDispatcher.this.runners = null;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This class is used to sync on blocking jobs across multiple runners.
-	 * Only after all runners reported back that they are finished, the
-	 * result will be released.
-	 * 
-	 * That way it is guaranteed that after the blocking job submit call returns,
-	 * the dispatcher is immediately free to accept another job.
-	 */
-	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
-
-		private final JobID jobId;
-
-		private final CountDownLatch jobMastersToWaitFor;
-
-		private volatile Throwable jobException;
-
-		private volatile Throwable runnerException;
-
-		private volatile JobExecutionResult result;
-		
-		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
-			this.jobId = jobId;
-			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
-		}
-
-		@Override
-		public void jobFinished(JobExecutionResult jobResult) {
-			this.result = jobResult;
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFailed(Throwable cause) {
-			jobException = cause;
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			this.jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			if (runnerException == null) {
-				runnerException = exception;
-			}
-		}
-
-		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
-			jobMastersToWaitFor.await();
-
-			final Throwable jobFailureCause = this.jobException;
-			final Throwable runnerException = this.runnerException;
-			final JobExecutionResult result = this.result;
-
-			// (1) we check if the job terminated with an exception
-			// (2) we check whether the job completed successfully
-			// (3) we check if we have exceptions from the JobManagers. the job may still have
-			//     completed successfully in that case, if multiple JobMasters were running
-			//     and other took over. only if all encounter a fatal error, the job cannot finish
-
-			if (jobFailureCause != null) {
-				if (jobFailureCause instanceof JobExecutionException) {
-					throw (JobExecutionException) jobFailureCause;
-				}
-				else {
-					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
-				}
-			}
-			else if (result != null) {
-				return result;
-			}
-			else if (runnerException != null) {
-				throw new JobExecutionException(jobId,
-						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
-			}
-			else {
-				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
new file mode 100644
index 0000000..1ee38e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorSystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.Option;
+import scala.Tuple2;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class MiniCluster {
+
+	/** The lock to guard startup / shutdown / manipulation methods */
+	private final Object lock = new Object();
+
+	/** The configuration for this mini cluster */
+	private final MiniClusterConfiguration config;
+
+	@GuardedBy("lock")
+	private MetricRegistry metricRegistry;
+
+	@GuardedBy("lock")
+	private RpcService commonRpcService;
+
+	@GuardedBy("lock")
+	private RpcService[] jobManagerRpcServices;
+
+	@GuardedBy("lock")
+	private RpcService[] taskManagerRpcServices;
+
+	@GuardedBy("lock")
+	private HighAvailabilityServices haServices;
+
+	@GuardedBy("lock")
+	private MiniClusterJobDispatcher jobDispatcher;
+
+	/** Flag marking the mini cluster as started/running */
+	@GuardedBy("lock")
+	private boolean running;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new mini cluster with the default configuration:
+	 * <ul>
+	 *     <li>One JobManager</li>
+	 *     <li>One TaskManager</li>
+	 *     <li>One task slot in the TaskManager</li>
+	 *     <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
+	 * </ul>
+	 */
+	public MiniCluster() {
+		this(new MiniClusterConfiguration());
+	}
+
+	/**
+	 * 
+	 * @param config The configuration for the mini cluster
+	 */
+	public MiniCluster(MiniClusterConfiguration config) {
+		this.config = checkNotNull(config, "config may not be null");
+	}
+
+	/**
+	 * Creates a mini cluster based on the given configuration.
+	 * 
+	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
+	 * @see #MiniCluster(MiniClusterConfiguration)
+	 */
+	@Deprecated
+	public MiniCluster(Configuration config) {
+		this(createConfig(config, true));
+	}
+
+	/**
+	 * Creates a mini cluster based on the given configuration, starting one or more
+	 * RPC services, depending on the given flag.
+	 *
+	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
+	 * @see #MiniCluster(MiniClusterConfiguration)
+	 */
+	@Deprecated
+	public MiniCluster(Configuration config, boolean singleRpcService) {
+		this(createConfig(config, singleRpcService));
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if the mini cluster was started and is running.
+	 */
+	public boolean isRunning() {
+		return running;
+	}
+
+	/**
+	 * Starts the mini cluster, based on the configured properties.
+	 * 
+	 * @throws Exception This method passes on any exception that occurs during the startup of
+	 *                   the mini cluster.
+	 */
+	public void start() throws Exception {
+		synchronized (lock) {
+			checkState(!running, "FlinkMiniCluster is already running");
+
+			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+			final Time rpcTimeout = config.getRpcTimeout();
+			final int numJobManagers = config.getNumJobManagers();
+			final int numTaskManagers = config.getNumTaskManagers();
+			final boolean singleRpc = config.getUseSingleRpcSystem();
+
+			try {
+				metricRegistry = createMetricRegistry(configuration);
+
+				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
+				RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+
+				// bring up all the RPC services
+				if (singleRpc) {
+					// one common RPC for all
+					commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+
+					// set that same RPC service for all JobManagers and TaskManagers
+					for (int i = 0; i < numJobManagers; i++) {
+						jobManagerRpcServices[i] = commonRpcService;
+					}
+					for (int i = 0; i < numTaskManagers; i++) {
+						taskManagerRpcServices[i] = commonRpcService;
+					}
+				}
+				else {
+					// start a new service per component, possibly with custom bind addresses
+					final String jobManagerBindAddress = config.getJobManagerBindAddress();
+					final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+
+					for (int i = 0; i < numJobManagers; i++) {
+						jobManagerRpcServices[i] = createRpcService(
+								configuration, rpcTimeout, true, jobManagerBindAddress);
+					}
+
+					for (int i = 0; i < numTaskManagers; i++) {
+						taskManagerRpcServices[i] = createRpcService(
+								configuration, rpcTimeout, true, taskManagerBindAddress);
+					}
+
+					this.jobManagerRpcServices = jobManagerRpcServices;
+					this.taskManagerRpcServices = taskManagerRpcServices;
+				}
+
+				// create the high-availability services
+				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+
+				// bring up the dispatcher that launches JobManagers when jobs submitted
+				jobDispatcher = new MiniClusterJobDispatcher(
+						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+			}
+			catch (Exception e) {
+				// cleanup everything
+				try {
+					shutdownInternally();
+				} catch (Exception ee) {
+					e.addSuppressed(ee);
+				}
+				throw e;
+			}
+
+			// now officially mark this as running
+			running = true;
+		}
+	}
+
+	/**
+	 * Shuts down the mini cluster, failing all currently executing jobs.
+	 * The mini cluster can be started again by calling the {@link #start()} method again.
+	 * 
+	 * <p>This method shuts down all started services and components,
+	 * even if an exception occurs in the process of shutting down some component. 
+	 * 
+	 * @throws Exception Thrown, if the shutdown did not complete cleanly.
+	 */
+	public void shutdown() throws Exception {
+		synchronized (lock) {
+			if (running) {
+				try {
+					shutdownInternally();
+				} finally {
+					running = false;
+				}
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void shutdownInternally() throws Exception {
+		// this should always be called under the lock
+		assert Thread.holdsLock(lock);
+
+		// collect the first exception, but continue and add all successive
+		// exceptions as suppressed
+		Throwable exception = null;
+
+		// cancel all jobs and shut down the job dispatcher
+		if (jobDispatcher != null) {
+			try {
+				jobDispatcher.shutdown();
+			} catch (Exception e) {
+				exception = firstOrSuppressed(e, exception);
+			}
+			jobDispatcher = null;
+		}
+
+		// shut down high-availability services
+		if (haServices != null) {
+			try {
+				haServices.shutdown();
+			} catch (Exception e) {
+				exception = firstOrSuppressed(e, exception);
+			}
+			haServices = null;
+		}
+
+		// shut down the RpcServices
+		if (commonRpcService != null) {
+			exception = shutDownRpc(commonRpcService, exception);
+			commonRpcService = null;
+		}
+		if (jobManagerRpcServices != null) {
+			for (RpcService service : jobManagerRpcServices) {
+				exception = shutDownRpc(service, exception);
+			}
+			jobManagerRpcServices = null;
+		}
+		if (taskManagerRpcServices != null) {
+			for (RpcService service : taskManagerRpcServices) {
+				exception = shutDownRpc(service, exception);
+			}
+			taskManagerRpcServices = null;
+		}
+
+		// metrics shutdown
+		if (metricRegistry != null) {
+			metricRegistry.shutdown();
+			metricRegistry = null;
+		}
+
+		// if anything went wrong, throw the first error with all the additional suppressed exceptions
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  running jobs
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method executes a job in detached mode. The method returns immediately after the job
+	 * has been added to the
+	 *
+	 * @param job  The Flink job to execute
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public void runDetached(JobGraph job) throws JobExecutionException {
+		checkNotNull(job, "job is null");
+
+		synchronized (lock) {
+			checkState(running, "mini cluster is not running");
+			jobDispatcher.runDetached(job);
+		}
+	}
+
+	/**
+	 * This method runs a job in blocking mode. The method returns only after the job
+	 * completed successfully, or after it failed terminally.
+	 *
+	 * @param job  The Flink job to execute 
+	 * @return The result of the job execution
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+		checkNotNull(job, "job is null");
+
+		MiniClusterJobDispatcher dispatcher;
+		synchronized (lock) {
+			checkState(running, "mini cluster is not running");
+			dispatcher = this.jobDispatcher;
+		}
+
+		return dispatcher.runJobBlocking(job);
+	}
+
+	// ------------------------------------------------------------------------
+	//  factories - can be overridden by subclasses to alter behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory method to create the metric registry for the mini cluster
+	 * 
+	 * @param config The configuration of the mini cluster
+	 */
+	protected MetricRegistry createMetricRegistry(Configuration config) {
+		return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+	}
+
+	/**
+	 * Factory method to instantiate the RPC service.
+	 * 
+	 * @param config
+	 *            The configuration of the mini cluster
+	 * @param askTimeout
+	 *            The default RPC timeout for asynchronous "ask" requests.
+	 * @param remoteEnabled
+	 *            True, if the RPC service should be reachable from other (remote) RPC services.
+	 * @param bindAddress
+	 *            The address to bind the RPC service to. Only relevant when "remoteEnabled" is true.
+	 * 
+	 * @return The instantiated RPC service
+	 */
+	protected RpcService createRpcService(
+			Configuration config,
+			Time askTimeout,
+			boolean remoteEnabled,
+			String bindAddress) {
+
+		ActorSystem actorSystem;
+		if (remoteEnabled) {
+			Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
+			actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+		} else {
+			actorSystem = AkkaUtils.createLocalActorSystem(config);
+		}
+
+		return new AkkaRpcService(actorSystem, askTimeout);
+	}
+
+	// ------------------------------------------------------------------------
+	//  miscellaneous utilities
+	// ------------------------------------------------------------------------
+
+	private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
+		try {
+			if (rpcService != null) {
+				rpcService.stopService();
+			}
+			return priorException;
+		}
+		catch (Throwable t) {
+			return firstOrSuppressed(t, priorException);
+		}
+	}
+
+	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+		MiniClusterConfiguration config = cfg == null ?
+				new MiniClusterConfiguration() :
+				new MiniClusterConfiguration(cfg);
+
+		if (!singleActorSystem) {
+			config.setUseRpcServicePerComponent();
+		}
+
+		return config;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
new file mode 100644
index 0000000..a8d7b10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MiniClusterConfiguration {
+
+	private final Configuration config;
+
+	private boolean singleRpcService = true;
+
+	private int numJobManagers = 1;
+
+	private int numTaskManagers = 1;
+
+	private String commonBindAddress;
+
+	// ------------------------------------------------------------------------
+	//  Construction
+	// ------------------------------------------------------------------------
+
+	public MiniClusterConfiguration() {
+		this.config = new Configuration();
+	}
+
+	public MiniClusterConfiguration(Configuration config) {
+		checkNotNull(config);
+		this.config = new Configuration(config);
+	}
+
+	// ------------------------------------------------------------------------
+	//  setters
+	// ------------------------------------------------------------------------
+
+	public void addConfiguration(Configuration config) {
+		checkNotNull(config, "configuration must not be null");
+		this.config.addAll(config);
+	}
+
+	public void setUseSingleRpcService() {
+		this.singleRpcService = true;
+	}
+
+	public void setUseRpcServicePerComponent() {
+		this.singleRpcService = false;
+	}
+
+	public void setNumJobManagers(int numJobManagers) {
+		checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+		this.numJobManagers = numJobManagers;
+	}
+
+	public void setNumTaskManagers(int numTaskManagers) {
+		checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
+		this.numTaskManagers = numTaskManagers;
+	}
+
+	public void setNumTaskManagerSlots(int numTaskSlots) {
+		checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
+		this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
+	}
+
+	public void setCommonRpcBindAddress(String bindAddress) {
+		checkNotNull(bindAddress, "bind address must not be null");
+		this.commonBindAddress = bindAddress;
+	}
+
+	// ------------------------------------------------------------------------
+	//  getters
+	// ------------------------------------------------------------------------
+
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	public boolean getUseSingleRpcSystem() {
+		return singleRpcService;
+	}
+
+	public int getNumJobManagers() {
+		return numJobManagers;
+	}
+
+	public int getNumTaskManagers() {
+		return numTaskManagers;
+	}
+
+	public int getNumSlotsPerTaskManager() {
+		return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+	}
+
+	public String getJobManagerBindAddress() {
+		return commonBindAddress != null ?
+				commonBindAddress :
+				config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+	}
+
+	public String getTaskManagerBindAddress() {
+		return commonBindAddress != null ?
+				commonBindAddress :
+				config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+	}
+
+	public Time getRpcTimeout() {
+		FiniteDuration duration = AkkaUtils.getTimeout(config);
+		return Time.of(duration.length(), duration.unit());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "MiniClusterConfiguration{" +
+				"singleRpcService=" + singleRpcService +
+				", numJobManagers=" + numJobManagers +
+				", numTaskManagers=" + numTaskManagers +
+				", commonBindAddress='" + commonBindAddress + '\'' +
+				", config=" + config +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..d99eff6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock to ensure that this dispatcher executes only one job at a time */
+	private final Object lock = new Object();
+
+	/** the configuration with which the mini cluster was started */
+	private final Configuration configuration;
+
+	/** the RPC services to use by the job managers */
+	private final RpcService[] rpcServices;
+
+	/** services for discovery, leader election, and recovery */
+	private final HighAvailabilityServices haServices;
+
+	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
+	private final JobManagerServices jobManagerServices;
+
+	/** Registry for all metrics in the mini cluster */
+	private final MetricRegistry metricRegistry;
+
+	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+	private final int numJobManagers;
+
+	/** The runner for the job and master. non-null if a job is currently running */
+	private volatile JobManagerRunner[] runners;
+
+	/** flag marking the dispatcher as hut down */
+	private volatile boolean shutdown;
+
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 * 
+	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+	 * non-highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry) throws Exception {
+		this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+	}
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 *
+	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+	 * a highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * @param numJobManagers The number of JobMasters to start for each job.
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
+			int numJobManagers,
+			RpcService[] rpcServices) throws Exception {
+		
+		checkArgument(numJobManagers >= 1);
+		checkArgument(rpcServices.length == numJobManagers);
+		
+		this.configuration = checkNotNull(config);
+		this.rpcServices = rpcServices;
+		this.haServices = checkNotNull(haServices);
+		this.metricRegistry = checkNotNull(metricRegistry);
+		this.numJobManagers = numJobManagers;
+
+		LOG.info("Creating JobMaster services");
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+	 * terminally failed.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			if (!shutdown) {
+				shutdown = true;
+
+				LOG.info("Shutting down the dispatcher");
+
+				// in this shutdown code we copy the references to the stack first,
+				// to avoid concurrent modification
+
+				JobManagerRunner[] runners = this.runners;
+				if (runners != null) {
+					this.runners = null;
+
+					for (JobManagerRunner runner : runners) {
+						runner.shutdown();
+					}
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  submitting jobs
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method executes a job in detached mode. The method returns immediately after the job
+	 * has been added to the
+	 *
+	 * @param job  The Flink job to execute
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public void runDetached(JobGraph job) throws JobExecutionException {
+		checkNotNull(job);
+
+		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
+
+			this.runners = startJobRunners(job, finalizer, finalizer);
+		}
+	}
+
+	/**
+	 * This method runs a job in blocking mode. The method returns only after the job
+	 * completed successfully, or after it failed terminally.
+	 *
+	 * @param job  The Flink job to execute 
+	 * @return The result of the job execution
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+		checkNotNull(job);
+		
+		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			this.runners = startJobRunners(job, sync, sync);
+		}
+
+		try {
+			return sync.getResult();
+		}
+		finally {
+			// always clear the status for the next job
+			runners = null;
+		}
+	}
+
+	private JobManagerRunner[] startJobRunners(
+			JobGraph job,
+			OnCompletionActions onCompletion,
+			FatalErrorHandler errorHandler) throws JobExecutionException {
+
+		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+		// we first need to mark the job as running in the HA services, so that the
+		// JobManager leader will recognize that it as work to do
+		try {
+			haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
+		}
+		catch (Throwable t) {
+			throw new JobExecutionException(job.getJobID(),
+					"Could not register the job at the high-availability services", t);
+		}
+
+		// start all JobManagers
+		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+		for (int i = 0; i < numJobManagers; i++) {
+			try {
+				runners[i] = new JobManagerRunner(job, configuration,
+						rpcServices[i], haServices, jobManagerServices, metricRegistry, 
+						onCompletion, errorHandler);
+				runners[i].start();
+			}
+			catch (Throwable t) {
+				// shut down all the ones so far
+				for (int k = 0; k <= i; k++) {
+					try {
+						if (runners[i] != null) {
+							runners[i].shutdown();
+						}
+					} catch (Throwable ignored) {
+						// silent shutdown
+					}
+				}
+
+				// un-register the job from the high.availability services
+				try {
+					haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
+				}
+				catch (Throwable tt) {
+					LOG.warn("Could not properly unregister job from high-availability services", tt);
+				}
+
+				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+			}
+		}
+
+		return runners;
+	}
+
+	// ------------------------------------------------------------------------
+	//  test methods to simulate job master failures
+	// ------------------------------------------------------------------------
+
+//	public void killJobMaster(int which) {
+//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+//		checkState(!shutdown, "mini cluster is shut down");
+//
+//		JobManagerRunner[] runners = this.runners;
+//		checkState(runners != null, "mini cluster it not executing a job right now");
+//
+//		runners[which].shutdown(new Throwable("kill JobManager"));
+//	}
+
+	// ------------------------------------------------------------------------
+	//  utility classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Simple class that waits for all runners to have reported that they are done.
+	 * In the case of a high-availability test setup, there may be multiple runners.
+	 * After that, it marks the mini cluster as ready to receive new jobs.
+	 */
+	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
+
+		private final AtomicInteger numJobManagersToWaitFor;
+
+		private DetachedFinalizer(int numJobManagersToWaitFor) {
+			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			decrementCheckAndCleanup();
+		}
+
+		private void decrementCheckAndCleanup() {
+			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+				MiniClusterJobDispatcher.this.runners = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This class is used to sync on blocking jobs across multiple runners.
+	 * Only after all runners reported back that they are finished, the
+	 * result will be released.
+	 * 
+	 * That way it is guaranteed that after the blocking job submit call returns,
+	 * the dispatcher is immediately free to accept another job.
+	 */
+	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
+
+		private final JobID jobId;
+
+		private final CountDownLatch jobMastersToWaitFor;
+
+		private volatile Throwable jobException;
+
+		private volatile Throwable runnerException;
+
+		private volatile JobExecutionResult result;
+		
+		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+			this.jobId = jobId;
+			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult jobResult) {
+			this.result = jobResult;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			jobException = cause;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			this.jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			if (runnerException == null) {
+				runnerException = exception;
+			}
+		}
+
+		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+			jobMastersToWaitFor.await();
+
+			final Throwable jobFailureCause = this.jobException;
+			final Throwable runnerException = this.runnerException;
+			final JobExecutionResult result = this.result;
+
+			// (1) we check if the job terminated with an exception
+			// (2) we check whether the job completed successfully
+			// (3) we check if we have exceptions from the JobManagers. the job may still have
+			//     completed successfully in that case, if multiple JobMasters were running
+			//     and other took over. only if all encounter a fatal error, the job cannot finish
+
+			if (jobFailureCause != null) {
+				if (jobFailureCause instanceof JobExecutionException) {
+					throw (JobExecutionException) jobFailureCause;
+				}
+				else {
+					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+				}
+			}
+			else if (result != null) {
+				return result;
+			}
+			else if (runnerException != null) {
+				throw new JobExecutionException(jobId,
+						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
+			}
+			else {
+				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6f6d525..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			try {
 				LeaderRetrievalService jobMasterLeaderRetriever =
-					highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
+					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
 				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
 			} catch (Exception e) {
 				log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 9e71349..e7f52e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -191,8 +191,7 @@ public class JobLeaderService {
 		LOG.info("Add job {} for job leader monitoring.", jobId);
 
 		final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-			jobId,
-			defaultTargetAddress);
+			jobId);
 
 		JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3e88e8c..877812b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;

http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
new file mode 100644
index 0000000..dd43337
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Integration test cases for the {@link MiniCluster}.
+ */
+public class MiniClusterITCase extends TestLogger {
+
+//	@Test
+	public void runJobWithSingleRpcService() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+
+		// should be the default, but set anyways to make sure the test
+		// stays valid when the default changes
+		cfg.setUseSingleRpcService();
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+//	@Test
+	public void runJobWithMultipleRpcServices() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+		cfg.setUseRpcServicePerComponent();
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+//	@Test
+	public void runJobWithMultipleJobManagers() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+		cfg.setNumJobManagers(3);
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+	private static void executeJob(MiniCluster miniCluster) throws Exception {
+		miniCluster.start();
+
+		JobGraph job = getSimpleJob();
+		miniCluster.runJobBlocking(job);
+	}
+
+	private static JobGraph getSimpleJob() {
+		JobVertex task = new JobVertex("Test task");
+		task.setParallelism(1);
+		task.setMaxParallelism(1);
+		task.setInvokableClass(NoOpInvokable.class);
+
+		return new JobGraph(new JobID(), "Test Job", task);
+	}
+}


[5/5] flink git commit: [FLINK-4351] [cluster management] JobManager handle TaskManager's registration

Posted by se...@apache.org.
[FLINK-4351] [cluster management] JobManager handle TaskManager's registration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b

Branch: refs/heads/flip-6
Commit: a19cae3b07963776c07c0aae7bee806004f59429
Parents: e91b82d
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 75 +++++++++++++++++---
 .../runtime/jobmaster/JobMasterGateway.java     | 15 ++--
 .../runtime/taskexecutor/JobLeaderService.java  | 55 ++++++--------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../taskexecutor/TaskManagerServices.java       |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 10 ++-
 6 files changed, 102 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 05c20d3..8cb9946 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Configuration of the JobManager */
 	private final Configuration configuration;
 
+	private final Time rpcTimeout;
+
 	/** Service to contend for and retrieve the leadership of JM and RM */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
-	// --------- resource manager --------
+	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
 	private ResourceManagerConnection resourceManagerConnection;
 
+	// --------- TaskManagers --------
+
+	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
 
 	// ------------------------------------------------------------------------
 
@@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		this.jobGraph = checkNotNull(jobGraph);
 		this.configuration = checkNotNull(configuration);
+		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.executionContext = checkNotNull(executorService);
@@ -243,6 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		this.slotPool = new SlotPool(executorService);
 		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+
+		this.registeredTaskManagers = new HashMap<>(4);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -379,8 +394,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 		closeResourceManagerConnection();
 
-		// TODO: disconnect from all registered task managers
-
+		for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
+			slotPool.releaseResource(taskManagerId);
+		}
+		registeredTaskManagers.clear();
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -662,11 +679,53 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public RegistrationResponse registerTaskManager(
-		final String taskManagerAddress,
-		final ResourceID taskManagerProcessId,
-		final UUID leaderId) {
-		throw new UnsupportedOperationException("Has to be implemented.");
+	public Future<RegistrationResponse> registerTaskManager(
+			final TaskManagerLocation taskManagerLocation,
+			final UUID leaderId) throws Exception
+	{
+		if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+							"leader session ID {} did not equal the received leader session ID {}.",
+					taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+					JobMaster.this.leaderSessionID, leaderId);
+			throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
+					+ ", actual: " + leaderId);
+		}
+
+		final ResourceID taskManagerId = taskManagerLocation.getResourceID();
+
+		if (registeredTaskManagers.containsKey(taskManagerId)) {
+			final RegistrationResponse response = new JMTMRegistrationSuccess(
+					taskManagerId, libraryCacheManager.getBlobServerPort());
+			return FlinkCompletableFuture.completed(response);
+		} else {
+			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+				@Override
+				public TaskExecutorGateway call() throws Exception {
+					return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
+				}
+			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+				@Override
+				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					}
+
+					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+										"leader session ID {} did not equal the received leader session ID {}.",
+								taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+								JobMaster.this.leaderSessionID, leaderId);
+						return new RegistrationResponse.Decline("Invalid leader session id");
+					}
+
+					slotPool.registerResource(taskManagerId);
+					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+				}
+			}, getMainThreadExecutor());
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0f155a4..4c85839 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.UUID;
 
@@ -195,15 +196,13 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Register the task manager at the job manager.
 	 *
-	 * @param taskManagerAddress address of the task manager
-	 * @param taskManagerProcessId identifying the task manager
-	 * @param leaderId identifying the job leader
-	 * @param timeout for the rpc call
+	 * @param taskManagerLocation location of the task manager
+	 * @param leaderId            identifying the job leader
+	 * @param timeout             for the rpc call
 	 * @return Future registration response indicating whether the registration was successful or not
 	 */
 	Future<RegistrationResponse> registerTaskManager(
-		final String taskManagerAddress,
-		final ResourceID taskManagerProcessId,
-		final UUID leaderId,
-		@RpcTimeout final Time timeout);
+			final TaskManagerLocation taskManagerLocation,
+			final UUID leaderId,
+			@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index e7f52e2..14d36ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +53,8 @@ public class JobLeaderService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
 
-	/** Process id of the owning process */
-	private final ResourceID ownerProcessId;
+	/** Self's location, used for the job manager connection */
+	private final TaskManagerLocation ownLocation;
 
 	/** The leader retrieval service and listener for each registered job */
 	private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
@@ -62,9 +62,6 @@ public class JobLeaderService {
 	/** Internal state of the service */
 	private volatile JobLeaderService.State state;
 
-	/** Address of the owner of this service. This address is used for the job manager connection */
-	private String ownerAddress;
-
 	/** Rpc service to use for establishing connections */
 	private RpcService rpcService;
 
@@ -74,14 +71,13 @@ public class JobLeaderService {
 	/** Job leader listener listening for job leader changes */
 	private JobLeaderListener jobLeaderListener;
 
-	public JobLeaderService(ResourceID ownerProcessId) {
-		this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId);
+	public JobLeaderService(TaskManagerLocation location) {
+		this.ownLocation = Preconditions.checkNotNull(location);
 
 		jobLeaderServices = new HashMap<>(4);
 
 		state = JobLeaderService.State.CREATED;
 
-		ownerAddress = null;
 		rpcService = null;
 		highAvailabilityServices = null;
 		jobLeaderListener = null;
@@ -94,13 +90,11 @@ public class JobLeaderService {
 	/**
 	 * Start the job leader service with the given services.
 	 *
-	 * @param initialOwnerAddress to be used for establishing connections (source address)
 	 * @param initialRpcService to be used to create rpc connections
 	 * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
 	 * @param initialJobLeaderListener listening for job leader changes
 	 */
 	public void start(
-		final String initialOwnerAddress,
 		final RpcService initialRpcService,
 		final HighAvailabilityServices initialHighAvailabilityServices,
 		final JobLeaderListener initialJobLeaderListener) {
@@ -110,7 +104,6 @@ public class JobLeaderService {
 		} else {
 			LOG.info("Start job leader service.");
 
-			this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
 			this.rpcService = Preconditions.checkNotNull(initialRpcService);
 			this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
 			this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -311,14 +304,13 @@ public class JobLeaderService {
 			@Override
 			protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
 				return new JobLeaderService.JobManagerRetryingRegistration(
-					LOG,
-					rpcService,
-					"JobManager",
-					JobMasterGateway.class,
-					getTargetAddress(),
-					getTargetLeaderId(),
-					ownerAddress,
-					ownerProcessId);
+						LOG,
+						rpcService,
+						"JobManager",
+						JobMasterGateway.class,
+						getTargetAddress(),
+						getTargetLeaderId(),
+						ownLocation);
 			}
 
 			@Override
@@ -349,10 +341,11 @@ public class JobLeaderService {
 	/**
 	 * Retrying registration for the job manager <--> task manager connection.
 	 */
-	private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+	private static final class JobManagerRetryingRegistration
+			extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
+	{
 
-		private final String taskManagerAddress;
-		private final ResourceID taskManagerProcessId;
+		private final TaskManagerLocation taskManagerLocation;
 
 		JobManagerRetryingRegistration(
 			Logger log,
@@ -361,22 +354,18 @@ public class JobLeaderService {
 			Class<JobMasterGateway> targetType,
 			String targetAddress,
 			UUID leaderId,
-			String taskManagerAddress,
-			ResourceID taskManagerProcessId) {
+			TaskManagerLocation taskManagerLocation) {
 
 			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
 
-			this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
-			this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId);
+			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		}
 
 		@Override
-		protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
-			return gateway.registerTaskManager(
-				taskManagerAddress,
-				taskManagerProcessId,
-				leaderId,
-				Time.milliseconds(timeoutMillis));
+		protected Future<RegistrationResponse> invokeRegistration(
+				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
+		{
+			return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3e3a544..1201281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -206,7 +206,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		taskSlotTable.start(new SlotActionsImpl());
 
 		// start the job leader service
-		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+		jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7575ba3..e8de1b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -207,7 +207,7 @@ public class TaskManagerServices {
 
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceID);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		
 		return new TaskManagerServices(
 			taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 23c6833..2220f12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -350,7 +350,7 @@ public class TaskExecutorTest extends TestLogger {
 		final TimerService<AllocationID> timerService = mock(TimerService.class);
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
@@ -379,8 +379,7 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			any(String.class),
-			eq(resourceId),
+			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
@@ -451,7 +450,7 @@ public class TaskExecutorTest extends TestLogger {
 		final TimerService<AllocationID> timerService = mock(TimerService.class);
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final String resourceManagerAddress = "rm";
@@ -484,8 +483,7 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			any(String.class),
-			eq(resourceId),
+			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));


[4/5] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager

Posted by se...@apache.org.
[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3

Branch: refs/heads/flip-6
Commit: e91b82d3c868e18611064f905b345906f1414f84
Parents: 655722a
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  1 -
 .../jobmanager/slots/PooledSlotProvider.java    | 73 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 24 ++++---
 3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
 
 		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
 
-		final SlotOwner owner = this;
 		return future.thenApplyAsync(
 			new ApplyFunction<SlotDescriptor, SimpleSlot>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+	/** The pool which holds all the slots. */
+	private final SlotPool slotPool;
+
+	/** The timeout for allocation. */
+	private final Time timeout;
+
+	public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+		this.slotPool = slotPool;
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+			boolean allowQueued) throws NoResourceAvailableException
+	{
+		checkNotNull(task);
+
+		final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
+		final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+		try {
+			final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
+			return FlinkCompletableFuture.completed(slot);
+		} catch (InterruptedException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
+		} catch (ExecutionException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
+					"during allocation, " + e.getMessage());
+		} catch (TimeoutException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a7be476..05c20d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** The execution graph of this job */
 	private final ExecutionGraph executionGraph;
 
+	private final SlotPool slotPool;
+
+	private final Time allocationTimeout;
 
 	private volatile UUID leaderSessionID;
 
@@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
 	private ResourceManagerConnection resourceManagerConnection;
 
-	// TODO - we need to replace this with the slot pool
-	private final Scheduler scheduler;
 
 	// ------------------------------------------------------------------------
 
@@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				-1,
 				log);
 
-		// TODO - temp fix
-		this.scheduler = new Scheduler(executorService);
+		this.slotPool = new SlotPool(executorService);
+		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 			super.start();
 
+			slotPool.setJobManagerLeaderId(leaderSessionID);
 			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 			getSelf().startJobExecution();
 		} else {
@@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				try {
-					executionGraph.scheduleForExecution(scheduler);
+					executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
 				} catch (Throwable t) {
 					executionGraph.fail(t);
 				}
@@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		((StartStoppable) getSelf()).stop();
 
 		leaderSessionID = null;
+		slotPool.setJobManagerLeaderId(null);
 		executionGraph.suspend(cause);
 
 		// disconnect from resource manager:
@@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
 				// verify the response with current connection
 				if (resourceManagerConnection != null
-						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+				{
 					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
 							success.getResourceManagerLeaderId());
+					slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+							resourceManagerConnection.getTargetGateway());
 				}
 			}
 		});
@@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+		slotPool.disconnectResourceManager();
 	}
 
 	//----------------------------------------------------------------------------------------------