You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/16 21:38:42 UTC
[1/5] flink git commit: [hotfix] [tests] Migrate some test tasks to
Java
Repository: flink
Updated Branches:
refs/heads/flip-6 2486d3787 -> a19cae3b0
[hotfix] [tests] Migrate some test tasks to Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7
Branch: refs/heads/flip-6
Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7
Parents: 2486d37
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:54:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:01:32 2016 +0200
----------------------------------------------------------------------
.../StackTraceSampleCoordinatorITCase.java | 8 ++--
.../checkpoint/CoordinatorShutdownTest.java | 8 ++--
.../ExecutionGraphMetricsTest.java | 4 +-
.../ExecutionGraphRestartTest.java | 23 +++++------
.../runtime/jobmanager/JobManagerTest.java | 9 +++--
.../flink/runtime/jobmanager/JobSubmitTest.java | 9 ++---
.../runtime/taskmanager/TaskManagerTest.java | 41 ++++++++++----------
.../testtasks/BlockingNoOpInvokable.java | 39 +++++++++++++++++++
.../flink/runtime/testtasks/NoOpInvokable.java | 30 ++++++++++++++
.../runtime/testtasks/WaitingNoOpInvokable.java | 34 ++++++++++++++++
.../TaskManagerLossFailsTasksTest.scala | 3 +-
.../runtime/jobmanager/JobManagerITCase.scala | 1 +
.../apache/flink/runtime/jobmanager/Tasks.scala | 20 ----------
.../JobSubmissionFailsITCase.java | 6 +--
.../JobManagerHACheckpointRecoveryITCase.java | 4 +-
.../JobManagerHAJobGraphRecoveryITCase.java | 4 +-
.../jobmanager/JobManagerFailsITCase.scala | 2 +-
.../taskmanager/TaskManagerFailsITCase.scala | 3 +-
18 files changed, 170 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..d74af08 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
final int parallelism = 1;
final JobVertex task = new JobVertex("Task");
- task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ task.setInvokableClass(BlockingNoOpInvokable.class);
task.setParallelism(parallelism);
jobGraph.addVertex(task);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..a346a80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -58,7 +60,7 @@ public class CoordinatorShutdownTest {
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
- vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ vertex.setInvokableClass(NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
@@ -110,7 +112,7 @@ public class CoordinatorShutdownTest {
// build a test graph with snapshotting enabled
JobVertex vertex = new JobVertex("Test Vertex");
- vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ vertex.setInvokableClass(NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 70c2bf9..1f8845d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -41,13 +41,13 @@ import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -89,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
JobVertex jobVertex = new JobVertex("TestVertex");
jobVertex.setParallelism(parallelism);
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
Configuration config = new Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0d09e38..9fbda51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,15 +36,16 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
@@ -102,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class);
- JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class);
+ JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, NoOpInvokable.class);
+ JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, NoOpInvokable.class);
SlotSharingGroup sharingGroup = new SlotSharingGroup();
groupVertex.setSlotSharingGroup(sharingGroup);
@@ -237,7 +238,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
JobVertex jobVertex = new JobVertex("NoOpInvokable");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
@@ -369,8 +370,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
- JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
+ JobVertex sender = newJobVertex("Task1", 1, NoOpInvokable.class);
+ JobVertex receiver = newJobVertex("Task2", 1, NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -432,7 +433,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+ JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -477,7 +478,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
+ JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -525,7 +526,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
scheduler.newInstanceAvailable(instance);
JobVertex sender = new JobVertex("Task");
- sender.setInvokableClass(Tasks.NoOpInvokable.class);
+ sender.setInvokableClass(NoOpInvokable.class);
sender.setParallelism(NUM_TASKS);
JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -639,7 +640,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);
- JobVertex sender = newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class);
+ JobVertex sender = newJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -656,7 +657,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
return new Tuple2<>(eg, instance);
}
- private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) {
+ private static JobVertex newJobVertex(String task1, int numTasks, Class<NoOpInvokable> invokable) {
JobVertex groupVertex = new JobVertex(task1);
groupVertex.setInvokableClass(invokable);
groupVertex.setParallelism(numTasks);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3019b06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -169,7 +170,7 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
sender.createAndAddResultDataSet(rid, PIPELINED);
final JobGraph jobGraph = new JobGraph("Blocking test job", sender);
@@ -340,7 +341,7 @@ public class JobManagerTest {
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(1);
- sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+ sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender);
final JobID jid = jobGraph.getJobID();
@@ -442,11 +443,11 @@ public class JobManagerTest {
JobGraph jobGraph = new JobGraph("croissant");
JobVertex jobVertex1 = new JobVertex("cappuccino");
jobVertex1.setParallelism(4);
- jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
JobVertex jobVertex2 = new JobVertex("americano");
jobVertex2.setParallelism(4);
- jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
jobGraph.addVertex(jobVertex1);
jobGraph.addVertex(jobVertex2);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 959b9a7..f793524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -18,9 +18,8 @@
package org.apache.flink.runtime.jobmanager;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,12 +27,12 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;
@@ -108,7 +107,7 @@ public class JobSubmitTest {
try {
// create a simple job graph
JobVertex jobVertex = new JobVertex("Test Vertex");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jg = new JobGraph("test job", jobVertex);
// request the blob port from the job manager
@@ -172,7 +171,7 @@ public class JobSubmitTest {
}
};
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jg = new JobGraph("test job", jobVertex);
// submit the job
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d1909fe..22a68b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
@@ -1099,25 +1100,25 @@ public class TaskManagerTest extends TestLogger {
// Single blocking task
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- new JobID(),
- new AllocationID(),
- "Job",
- new JobVertexID(),
- new ExecutionAttemptID(),
- new SerializedValue<>(new ExecutionConfig()),
- "Task",
- 1,
- 0,
- 1,
- 0,
- new Configuration(),
- new Configuration(),
- Tasks.BlockingNoOpInvokable.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0);
+ new JobID(),
+ new AllocationID(),
+ "Job",
+ new JobVertexID(),
+ new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "Task",
+ 1,
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ BlockingNoOpInvokable.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
// Submit the task
new Within(d) {
@@ -1220,7 +1221,7 @@ public class TaskManagerTest extends TestLogger {
// Look for BlockingNoOpInvokable#invoke
for (StackTraceElement elem : trace) {
if (elem.getClassName().equals(
- Tasks.BlockingNoOpInvokable.class.getName())) {
+ BlockingNoOpInvokable.class.getName())) {
assertEquals("invoke", elem.getMethodName());
success = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c9adba8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A task that does nothing but blocks indefinitely, until the executing thread is interrupted.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ final Object o = new Object();
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (o) {
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ o.wait();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
new file mode 100644
index 0000000..fa9949a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes immediately.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
new file mode 100644
index 0000000..de7d59a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes after a short delay of 100 milliseconds.
+ */
+public class WaitingNoOpInvokable extends AbstractInvokable {
+
+ private static final long waitingTime = 100L;
+
+ @Override
+ public void invoke() throws Exception {
+ Thread.sleep(waitingTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index c30d244..ff0286d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testtasks.NoOpInvokable
import org.apache.flink.util.SerializedValue
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -48,7 +49,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
scheduler.newInstanceAvailable(instance2)
val sender = new JobVertex("Task")
- sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+ sender.setInvokableClass(classOf[NoOpInvokable])
sender.setParallelism(20)
val jobGraph = new JobGraph("Pointwise job", sender)
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0569297..4aa0565 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableExcepti
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.testtasks._
import org.junit.runner.RunWith
import org.mockito.Mockito
import org.mockito.Mockito._
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 87c123a..fabd66b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -25,26 +25,6 @@ import org.apache.flink.types.IntValue
object Tasks {
- class BlockingNoOpInvokable extends AbstractInvokable {
- override def invoke(): Unit = {
- val o = new Object()
- o.synchronized{
- o.wait()
- }
- }
- }
-
- class NoOpInvokable extends AbstractInvokable{
- override def invoke(): Unit = {}
- }
-
- class WaitingNoOpInvokable extends AbstractInvokable{
- val waitingTime = 100L
-
- override def invoke(): Unit = {
- Thread.sleep(waitingTime)
- }
- }
class Sender extends AbstractInvokable{
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 178656d..256b1ae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -63,7 +63,7 @@ public class JobSubmissionFailsITCase {
cluster.start();
final JobVertex jobVertex = new JobVertex("Working job vertex.");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
workingJobGraph = new JobGraph("Working testing job", jobVertex);
}
catch (Exception e) {
@@ -113,7 +113,7 @@ public class JobSubmissionFailsITCase {
public void testExceptionInInitializeOnMaster() {
try {
final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
- failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ failingJobVertex.setInvokableClass(NoOpInvokable.class);
final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 49eaeb7..92b90da 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -372,7 +372,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// BLocking JobGraph
JobVertex blockingVertex = new JobVertex("Blocking vertex");
- blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ blockingVertex.setInvokableClass(BlockingNoOpInvokable.class);
JobGraph jobGraph = new JobGraph(blockingVertex);
// Submit the job in detached mode
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bf39c4b..236e922 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -48,6 +47,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -461,7 +461,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
JobGraph jobGraph = new JobGraph("Blocking program");
JobVertex jobVertex = new JobVertex("Blocking Vertex");
- jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+ jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
jobGraph.addVertex(jobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1b2838d..258282e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -23,7 +23,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.Acknowledge
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 3b39b3f..e141cc2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, BlockingReceiver, NoOpInvokable, Sender}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
[2/5] flink git commit: [FLINK-4835] [cluster management] Add
embedded version of the high-availability services
Posted by se...@apache.org.
[FLINK-4835] [cluster management] Add embedded version of the high-availability services
This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79476624
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79476624
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79476624
Branch: refs/heads/flip-6
Commit: 7947662467d102edc675fde19948b7638a044343
Parents: da16b0a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:09:39 2016 +0200
----------------------------------------------------------------------
.../StandaloneCheckpointRecoveryFactory.java | 4 +-
.../highavailability/EmbeddedNonHaServices.java | 62 +++
.../HighAvailabilityServices.java | 7 +-
.../runtime/highavailability/NonHaServices.java | 62 +--
.../highavailability/ZookeeperHaServices.java | 12 +-
.../nonha/AbstractNonHaServices.java | 175 +++++++
.../nonha/EmbeddedLeaderService.java | 466 +++++++++++++++++++
.../TestingHighAvailabilityServices.java | 9 +
8 files changed, 736 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
throws Exception {
- return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
- .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+ return new StandaloneCompletedCheckpointStore(
+ CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 0000000..58da287
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
+
+ private final EmbeddedLeaderService resourceManagerLeaderService;
+
+ public EmbeddedNonHaServices() {
+ super();
+ this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService());
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return resourceManagerLeaderService.createLeaderRetrievalService();
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return resourceManagerLeaderService.createLeaderElectionService();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ super.shutdown();
+ resourceManagerLeaderService.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 484cddb..f6db682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,11 +49,10 @@ public interface HighAvailabilityServices {
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
- * @param defaultAddress address under which the job manager is reachable
* @return
* @throws Exception
*/
- LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception;
+ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
/**
* Gets the leader election service for the cluster's resource manager.
@@ -86,4 +85,8 @@ public interface HighAvailabilityServices {
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*/
BlobStore createBlobStore() throws IOException;
+
+ // ------------------------------------------------------------------------
+
+ void shutdown() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 1c73c01..107cbd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,21 +18,13 @@
package org.apache.flink.runtime.highavailability;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,35 +33,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* This implementation can be used for testing, and for cluster setups that do not
* tolerate failures of the master processes (JobManager, ResourceManager).
*
- * <p>This implementation has no dependencies on any external services. It returns fix
- * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
- * in volatile memory.
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
*/
-public class NonHaServices implements HighAvailabilityServices {
+public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
- private final ConcurrentHashMap<JobID, String> jobMastersAddress;
-
/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
*/
public NonHaServices(String resourceManagerAddress) {
+ super();
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
- this.jobMastersAddress = new ConcurrentHashMap<>(16);
- }
-
- /**
- * Binds address of a specified job master
- *
- * @param jobID JobID for the specified job master
- * @param jobMasterAddress address for the specified job master
- */
- public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
- jobMastersAddress.put(jobID, jobMasterAddress);
}
// ------------------------------------------------------------------------
@@ -82,37 +62,7 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
- return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0));
- }
-
- @Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
return new StandaloneLeaderElectionService();
}
-
- @Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
- return new StandaloneLeaderElectionService();
- }
-
- @Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
- return new StandaloneCheckpointRecoveryFactory();
- }
-
- @Override
- public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
- return new StandaloneSubmittedJobGraphStore();
- }
-
- @Override
- public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
- return new NonHaRegistry();
- }
-
- @Override
- public BlobStore createBlobStore() {
- return new VoidBlobStore();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index bbe8ecb..eae45ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -85,7 +85,8 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
// ------------------------------------------------------------------------
-
+
+
/** The ZooKeeper client to use */
private final CuratorFramework client;
@@ -168,6 +169,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
}
// ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ client.close();
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
new file mode 100644
index 0000000..8c15a52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed
+ * by storage that has no availability guarantees and leader election services that cannot
+ * elect among multiple distributed leader contenders.
+ */
+public abstract class AbstractNonHaServices implements HighAvailabilityServices {
+
+ private final Object lock = new Object();
+
+ private final ExecutorService executor;
+
+ private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+ private final RunningJobsRegistry runningJobsRegistry;
+
+ private boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ public AbstractNonHaServices() {
+ this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory());
+ this.jobManagerLeaderServices = new HashMap<>();
+ this.runningJobsRegistry = new NonHaRegistry();
+ }
+
+ // ------------------------------------------------------------------------
+ // services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderRetrievalService();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderElectionService();
+ }
+ }
+
+ @GuardedBy("lock")
+ private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+ EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+ if (service == null) {
+ service = new EmbeddedLeaderService(executor);
+ jobManagerLeaderServices.put(jobID, service);
+ }
+ return service;
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ checkNotShutdown();
+ return new StandaloneCheckpointRecoveryFactory();
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ checkNotShutdown();
+ return new StandaloneSubmittedJobGraphStore();
+ }
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ checkNotShutdown();
+ return runningJobsRegistry;
+ }
+
+ @Override
+ public BlobStore createBlobStore() throws IOException {
+ checkNotShutdown();
+ return new VoidBlobStore();
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ synchronized (lock) {
+ if (!shutdown) {
+ shutdown = true;
+
+ // no further calls should be dispatched
+ executor.shutdownNow();
+
+ // stop all job manager leader services
+ for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+ service.shutdown();
+ }
+ jobManagerLeaderServices.clear();
+ }
+ }
+ }
+
+ private void checkNotShutdown() {
+ checkState(!shutdown, "high availability services are shut down");
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ protected ExecutorService getExecutorService() {
+ return executor;
+ }
+
+ private static final class ServicesThreadFactory implements ThreadFactory {
+
+ private AtomicInteger enumerator = new AtomicInteger();
+
+ @Override
+ public Thread newThread(@Nonnull Runnable r) {
+ Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
new file mode 100644
index 0000000..84ac551
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ *
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+ private final Object lock = new Object();
+
+ private final Executor notificationExecutor;
+
+ private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+ private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+ /** proposed leader, which has been notified of leadership grant, but has not confirmed */
+ private EmbeddedLeaderElectionService currentLeaderProposed;
+
+ /** actual leader that has confirmed leadership and of which listeners have been notified */
+ private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+ /** fencing UID for the current leader (or proposed leader) */
+ private UUID currentLeaderSessionId;
+
+ /** the cached address of the current leader */
+ private String currentLeaderAddress;
+
+ /** flag marking the service as terminated */
+ private boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
+ this.notificationExecutor = checkNotNull(notificationsDispatcher);
+ this.allLeaderContenders = new HashSet<>();
+ this.listeners = new HashSet<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown and errors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down this leader election service.
+ *
+ * <p>This method does not perform a clean revocation of the leader status and
+ * no notification to any leader listeners. It simply notifies all contenders
+ * and listeners that the service is no longer available.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down"));
+ }
+ }
+
+ private void fatalError(Throwable error) {
+ LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+ }
+ }
+
+ @GuardedBy("lock")
+ private void shutdownInternally(Exception exceptionForHandlers) {
+ assert Thread.holdsLock(lock);
+
+ if (!shutdown) {
+ // clear all leader status
+ currentLeaderProposed = null;
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+
+ // fail all registered listeners
+ for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+ service.shutdown(exceptionForHandlers);
+ }
+ allLeaderContenders.clear();
+
+ // fail all registered listeners
+ for (EmbeddedLeaderRetrievalService service : listeners) {
+ service.shutdown(exceptionForHandlers);
+ }
+ listeners.clear();
+
+ shutdown = true;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // creating contenders and listeners
+ // ------------------------------------------------------------------------
+
+ public LeaderElectionService createLeaderElectionService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderElectionService();
+ }
+
+ public LeaderRetrievalService createLeaderRetrievalService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderRetrievalService();
+ }
+
+ // ------------------------------------------------------------------------
+ // adding and removing contenders & listeners
+ // ------------------------------------------------------------------------
+
+ /**
+ * Callback from leader contenders when they start their service.
+ */
+ void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader election service is already started");
+
+ try {
+ if (!allLeaderContenders.add(service)) {
+ throw new IllegalStateException("leader election service was added to this service multiple times");
+ }
+
+ service.contender = contender;
+ service.running = true;
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they stop their service.
+ */
+ void removeContender(EmbeddedLeaderElectionService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!allLeaderContenders.remove(service)) {
+ throw new IllegalStateException("leader election service does not belong to this service");
+ }
+
+ // stop the service
+ service.contender = null;
+ service.running = false;
+ service.isLeader = false;
+
+ // if that was the current leader, unset its status
+ if (currentLeaderConfirmed == service) {
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+ }
+ if (currentLeaderProposed == service) {
+ currentLeaderProposed = null;
+ currentLeaderSessionId = null;
+ }
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they confirm a leader grant
+ */
+ void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+ synchronized (lock) {
+ // if the service was shut down in the meantime, ignore this confirmation
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ // check if the confirmation is for the same grant, or whether it is a stale grant
+ if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+ final String address = service.contender.getAddress();
+ LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+
+ // mark leadership
+ currentLeaderConfirmed = service;
+ currentLeaderAddress = address;
+ currentLeaderProposed = null;
+ service.isLeader = true;
+
+ // notify all listeners
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+ }
+ }
+ else {
+ LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void updateLeader() {
+ // this must be called under the lock
+ assert Thread.holdsLock(lock);
+
+ if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+ // we need a new leader
+ if (allLeaderContenders.isEmpty()) {
+ // no new leader available, tell everyone that there is no leader currently
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+ }
+ }
+ else {
+ // propose a leader and ask it
+ final UUID leaderSessionId = UUID.randomUUID();
+ EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+ currentLeaderSessionId = leaderSessionId;
+ currentLeaderProposed = leaderService;
+
+ notificationExecutor.execute(
+ new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+ }
+ }
+ }
+
+ void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader retrieval service is already started");
+
+ try {
+ if (!listeners.add(service)) {
+ throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+ }
+
+ service.listener = listener;
+ service.running = true;
+
+ // if we already have a leader, immediately notify this new listener
+ if (currentLeaderConfirmed != null) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ void removeListener(EmbeddedLeaderRetrievalService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!listeners.remove(service)) {
+ throw new IllegalStateException("leader retrieval service does not belong to this service");
+ }
+
+ // stop the service
+ service.listener = null;
+ service.running = false;
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // election and retrieval service implementations
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+ volatile LeaderContender contender;
+
+ volatile boolean isLeader;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderContender contender) throws Exception {
+ checkNotNull(contender);
+ addContender(this, contender);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeContender(this);
+ }
+
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionID) {
+ checkNotNull(leaderSessionID);
+ confirmLeader(this, leaderSessionID);
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ return isLeader;
+ }
+
+ void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ isLeader = false;
+ contender.handleError(cause);
+ contender = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+ volatile LeaderRetrievalListener listener;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ checkNotNull(listener);
+ addListener(this, listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeListener(this);
+ }
+
+ public void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ listener.handleError(cause);
+ listener = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // asynchronous notifications
+ // ------------------------------------------------------------------------
+
+ private static class NotifyOfLeaderCall implements Runnable {
+
+ @Nullable
+ private final String address; // null if leader revoked without new leader
+ @Nullable
+ private final UUID leaderSessionId; // null if leader revoked without new leader
+
+ private final LeaderRetrievalListener listener;
+ private final Logger logger;
+
+ NotifyOfLeaderCall(
+ @Nullable String address,
+ @Nullable UUID leaderSessionId,
+ LeaderRetrievalListener listener,
+ Logger logger) {
+
+ this.address = address;
+ this.leaderSessionId = leaderSessionId;
+ this.listener = checkNotNull(listener);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ listener.notifyLeaderAddress(address, leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class GrantLeadershipCall implements Runnable {
+
+ private final LeaderContender contender;
+ private final UUID leaderSessionId;
+ private final Logger logger;
+
+ GrantLeadershipCall(
+ LeaderContender contender,
+ UUID leaderSessionId,
+ Logger logger) {
+
+ this.contender = checkNotNull(contender);
+ this.leaderSessionId = checkNotNull(leaderSessionId);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ contender.grantLeadership(leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 38e372d..3e88e8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
public BlobStore createBlobStore() throws IOException {
return new VoidBlobStore();
}
+
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ // nothing to do, since this should not shut down individual services, but cross service parts
+ }
}
[3/5] flink git commit: [FLINK-4836] [cluster management] Add flink
mini cluster (part 1)
Posted by se...@apache.org.
[FLINK-4836] [cluster management] Add flink mini cluster (part 1)
This implements
- mini cluster configuration
- startup / shutdown of common services (rpc, ha)
- startup / shutdown of JobManager and Dispatcher
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655722a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655722a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655722a2
Branch: refs/heads/flip-6
Commit: 655722a29edcca261653f54fa07fb7b4c94602a6
Parents: 7947662
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:36 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 62 ++-
.../HighAvailabilityServicesUtils.java | 17 +
.../highavailability/ZookeeperHaServices.java | 2 +-
.../runtime/jobmaster/JobManagerRunner.java | 1 -
.../jobmaster/MiniClusterJobDispatcher.java | 394 -----------------
.../flink/runtime/minicluster/MiniCluster.java | 406 ++++++++++++++++++
.../minicluster/MiniClusterConfiguration.java | 147 +++++++
.../minicluster/MiniClusterJobDispatcher.java | 418 +++++++++++++++++++
.../resourcemanager/ResourceManager.java | 2 +-
.../runtime/taskexecutor/JobLeaderService.java | 3 +-
.../TestingHighAvailabilityServices.java | 2 +-
.../runtime/minicluster/MiniClusterITCase.java | 79 ++++
12 files changed, 1126 insertions(+), 407 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 0f6f24f..d34b236 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
@Internal
public final class ExceptionUtils {
@@ -58,7 +61,54 @@ public final class ExceptionUtils {
return e.getClass().getName() + " (error while printing stack trace)";
}
}
-
+
+ /**
+ * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
+ * to a prior exception, or returns the new exception, if no prior exception exists.
+ *
+ * <pre>{@code
+ *
+ * public void closeAllThings() throws Exception {
+ * Exception ex = null;
+ * try {
+ * component.shutdown();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ * try {
+ * anotherComponent.stop();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ * try {
+ * lastComponent.shutdown();
+ * } catch (Exception e) {
+ * ex = firstOrSuppressed(e, ex);
+ * }
+ *
+ * if (ex != null) {
+ * throw ex;
+ * }
+ * }
+ * }</pre>
+ *
+ * @param newException The newly occurred exception
+ * @param previous The previously occurred exception, possibly null.
+ *
+ * @return The new exception, if no previous exception exists, or the previous exception with the
+ * new exception in the list of suppressed exceptions.
+ */
+ public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
+ checkNotNull(newException, "newException");
+
+ if (previous == null) {
+ return newException;
+ } else {
+ previous.addSuppressed(newException);
+ return previous;
+ }
+ }
+
/**
* Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
* throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions
@@ -161,10 +211,8 @@ public final class ExceptionUtils {
}
}
- /**
- * Private constructor to prevent instantiation.
- */
- private ExceptionUtils() {
- throw new RuntimeException();
- }
+ // ------------------------------------------------------------------------
+
+ /** Private constructor to prevent instantiation. */
+ private ExceptionUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f3da847..9113309 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
public class HighAvailabilityServicesUtils {
+ public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+ HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
+
+ switch (highAvailabilityMode) {
+ case NONE:
+ return new EmbeddedNonHaServices();
+
+ case ZOOKEEPER:
+ throw new UnsupportedOperationException("ZooKeeper high availability services " +
+ "have not been implemented yet.");
+
+ default:
+ throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+ }
+ }
+
+
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index eae45ab..a9d2610 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -112,7 +112,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 74ca6f3..3313d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
@Override
public void jobFinishedByOther() {
try {
- unregisterJobFromHighAvailability();
shutdownInternally();
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
deleted file mode 100644
index 019ccfe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
- private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
- // ------------------------------------------------------------------------
-
- /** lock to ensure that this dispatcher executes only one job at a time */
- private final Object lock = new Object();
-
- /** the configuration with which the mini cluster was started */
- private final Configuration configuration;
-
- /** the RPC service to use by the job managers */
- private final RpcService rpcService;
-
- /** services for discovery, leader election, and recovery */
- private final HighAvailabilityServices haServices;
-
- /** al the services that the JobManager needs, such as BLOB service, factories, etc */
- private final JobManagerServices jobManagerServices;
-
- /** Registry for all metrics in the mini cluster */
- private final MetricRegistry metricRegistry;
-
- /** The number of JobManagers to launch (more than one simulates a high-availability setup) */
- private final int numJobManagers;
-
- /** The runner for the job and master. non-null if a job is currently running */
- private volatile JobManagerRunner[] runners;
-
- /** flag marking the dispatcher as hut down */
- private volatile boolean shutdown;
-
-
- /**
- * Starts a mini cluster job dispatcher.
- *
- * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
- * non-highly-available setup.
- *
- * @param config The configuration of the mini cluster
- * @param haServices Access to the discovery, leader election, and recovery services
- *
- * @throws Exception Thrown, if the services for the JobMaster could not be started.
- */
- public MiniClusterJobDispatcher(
- Configuration config,
- RpcService rpcService,
- HighAvailabilityServices haServices,
- MetricRegistry metricRegistry) throws Exception {
- this(config, rpcService, haServices, metricRegistry, 1);
- }
-
- /**
- * Starts a mini cluster job dispatcher.
- *
- * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
- * a highly-available setup.
- *
- * @param config The configuration of the mini cluster
- * @param haServices Access to the discovery, leader election, and recovery services
- * @param numJobManagers The number of JobMasters to start for each job.
- *
- * @throws Exception Thrown, if the services for the JobMaster could not be started.
- */
- public MiniClusterJobDispatcher(
- Configuration config,
- RpcService rpcService,
- HighAvailabilityServices haServices,
- MetricRegistry metricRegistry,
- int numJobManagers) throws Exception {
-
- checkArgument(numJobManagers >= 1);
- this.configuration = checkNotNull(config);
- this.rpcService = checkNotNull(rpcService);
- this.haServices = checkNotNull(haServices);
- this.metricRegistry = checkNotNull(metricRegistry);
- this.numJobManagers = numJobManagers;
-
- LOG.info("Creating JobMaster services");
- this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
- }
-
- // ------------------------------------------------------------------------
- // life cycle
- // ------------------------------------------------------------------------
-
- /**
- * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
- * terminally failed.
- */
- public void shutdown() {
- synchronized (lock) {
- if (!shutdown) {
- shutdown = true;
-
- LOG.info("Shutting down the dispatcher");
-
- // in this shutdown code we copy the references to the stack first,
- // to avoid concurrent modification
-
- JobManagerRunner[] runners = this.runners;
- if (runners != null) {
- this.runners = null;
-
- for (JobManagerRunner runner : runners) {
- runner.shutdown();
- }
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // submitting jobs
- // ------------------------------------------------------------------------
-
- /**
- * This method executes a job in detached mode. The method returns immediately after the job
- * has been added to the
- *
- * @param job The Flink job to execute
- *
- * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
- * or if the job terminally failed.
- */
- public void runDetached(JobGraph job) throws JobExecutionException {
- checkNotNull(job);
-
- LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
-
- synchronized (lock) {
- checkState(!shutdown, "mini cluster is shut down");
- checkState(runners == null, "mini cluster can only execute one job at a time");
-
- DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
-
- this.runners = startJobRunners(job, finalizer, finalizer);
- }
- }
-
- /**
- * This method runs a job in blocking mode. The method returns only after the job
- * completed successfully, or after it failed terminally.
- *
- * @param job The Flink job to execute
- * @return The result of the job execution
- *
- * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
- * or if the job terminally failed.
- */
- public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
- checkNotNull(job);
-
- LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
- final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
-
- synchronized (lock) {
- checkState(!shutdown, "mini cluster is shut down");
- checkState(runners == null, "mini cluster can only execute one job at a time");
-
- this.runners = startJobRunners(job, sync, sync);
- }
-
- try {
- return sync.getResult();
- }
- finally {
- // always clear the status for the next job
- runners = null;
- }
- }
-
- private JobManagerRunner[] startJobRunners(
- JobGraph job,
- OnCompletionActions onCompletion,
- FatalErrorHandler errorHandler) throws JobExecutionException {
- LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
-
- JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
- for (int i = 0; i < numJobManagers; i++) {
- try {
- runners[i] = new JobManagerRunner(job, configuration,
- rpcService, haServices, jobManagerServices, metricRegistry,
- onCompletion, errorHandler);
- runners[i].start();
- }
- catch (Throwable t) {
- // shut down all the ones so far
- for (int k = 0; k <= i; k++) {
- try {
- if (runners[i] != null) {
- runners[i].shutdown();
- }
- } catch (Throwable ignored) {
- // silent shutdown
- }
- }
-
- throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
- }
- }
-
- return runners;
- }
-
- // ------------------------------------------------------------------------
- // test methods to simulate job master failures
- // ------------------------------------------------------------------------
-
-// public void killJobMaster(int which) {
-// checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-// checkState(!shutdown, "mini cluster is shut down");
-//
-// JobManagerRunner[] runners = this.runners;
-// checkState(runners != null, "mini cluster it not executing a job right now");
-//
-// runners[which].shutdown(new Throwable("kill JobManager"));
-// }
-
- // ------------------------------------------------------------------------
- // utility classes
- // ------------------------------------------------------------------------
-
- /**
- * Simple class that waits for all runners to have reported that they are done.
- * In the case of a high-availability test setup, there may be multiple runners.
- * After that, it marks the mini cluster as ready to receive new jobs.
- */
- private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
-
- private final AtomicInteger numJobManagersToWaitFor;
-
- private DetachedFinalizer(int numJobManagersToWaitFor) {
- this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
- }
-
- @Override
- public void jobFinished(JobExecutionResult result) {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void jobFailed(Throwable cause) {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void jobFinishedByOther() {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void onFatalError(Throwable exception) {
- decrementCheckAndCleanup();
- }
-
- private void decrementCheckAndCleanup() {
- if (numJobManagersToWaitFor.decrementAndGet() == 0) {
- MiniClusterJobDispatcher.this.runners = null;
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * This class is used to sync on blocking jobs across multiple runners.
- * Only after all runners reported back that they are finished, the
- * result will be released.
- *
- * That way it is guaranteed that after the blocking job submit call returns,
- * the dispatcher is immediately free to accept another job.
- */
- private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
-
- private final JobID jobId;
-
- private final CountDownLatch jobMastersToWaitFor;
-
- private volatile Throwable jobException;
-
- private volatile Throwable runnerException;
-
- private volatile JobExecutionResult result;
-
- BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
- this.jobId = jobId;
- this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
- }
-
- @Override
- public void jobFinished(JobExecutionResult jobResult) {
- this.result = jobResult;
- jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void jobFailed(Throwable cause) {
- jobException = cause;
- jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void jobFinishedByOther() {
- this.jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void onFatalError(Throwable exception) {
- if (runnerException == null) {
- runnerException = exception;
- }
- }
-
- public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
- jobMastersToWaitFor.await();
-
- final Throwable jobFailureCause = this.jobException;
- final Throwable runnerException = this.runnerException;
- final JobExecutionResult result = this.result;
-
- // (1) we check if the job terminated with an exception
- // (2) we check whether the job completed successfully
- // (3) we check if we have exceptions from the JobManagers. the job may still have
- // completed successfully in that case, if multiple JobMasters were running
- // and other took over. only if all encounter a fatal error, the job cannot finish
-
- if (jobFailureCause != null) {
- if (jobFailureCause instanceof JobExecutionException) {
- throw (JobExecutionException) jobFailureCause;
- }
- else {
- throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
- }
- }
- else if (result != null) {
- return result;
- }
- else if (runnerException != null) {
- throw new JobExecutionException(jobId,
- "The job execution failed because all JobManagers encountered fatal errors", runnerException);
- }
- else {
- throw new IllegalStateException("Bug: Job finished with neither error nor result.");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
new file mode 100644
index 0000000..1ee38e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorSystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.Option;
+import scala.Tuple2;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class MiniCluster {
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ /** The configuration for this mini cluster */
+ private final MiniClusterConfiguration config;
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService;
+
+ @GuardedBy("lock")
+ private RpcService[] jobManagerRpcServices;
+
+ @GuardedBy("lock")
+ private RpcService[] taskManagerRpcServices;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private MiniClusterJobDispatcher jobDispatcher;
+
+ /** Flag marking the mini cluster as started/running */
+ @GuardedBy("lock")
+ private boolean running;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new mini cluster with the default configuration:
+ * <ul>
+ * <li>One JobManager</li>
+ * <li>One TaskManager</li>
+ * <li>One task slot in the TaskManager</li>
+ * <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
+ * </ul>
+ */
+ public MiniCluster() {
+ this(new MiniClusterConfiguration());
+ }
+
+ /**
+ *
+ * @param config The configuration for the mini cluster
+ */
+ public MiniCluster(MiniClusterConfiguration config) {
+ this.config = checkNotNull(config, "config may not be null");
+ }
+
+ /**
+ * Creates a mini cluster based on the given configuration.
+ *
+ * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
+ * @see #MiniCluster(MiniClusterConfiguration)
+ */
+ @Deprecated
+ public MiniCluster(Configuration config) {
+ this(createConfig(config, true));
+ }
+
+ /**
+ * Creates a mini cluster based on the given configuration, starting one or more
+ * RPC services, depending on the given flag.
+ *
+ * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
+ * @see #MiniCluster(MiniClusterConfiguration)
+ */
+ @Deprecated
+ public MiniCluster(Configuration config, boolean singleRpcService) {
+ this(createConfig(config, singleRpcService));
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks if the mini cluster was started and is running.
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * Starts the mini cluster, based on the configured properties.
+ *
+ * @throws Exception This method passes on any exception that occurs during the startup of
+ * the mini cluster.
+ */
+ public void start() throws Exception {
+ synchronized (lock) {
+ checkState(!running, "FlinkMiniCluster is already running");
+
+ final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+ final Time rpcTimeout = config.getRpcTimeout();
+ final int numJobManagers = config.getNumJobManagers();
+ final int numTaskManagers = config.getNumTaskManagers();
+ final boolean singleRpc = config.getUseSingleRpcSystem();
+
+ try {
+ metricRegistry = createMetricRegistry(configuration);
+
+ RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
+ RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+
+ // bring up all the RPC services
+ if (singleRpc) {
+ // one common RPC for all
+ commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+
+ // set that same RPC service for all JobManagers and TaskManagers
+ for (int i = 0; i < numJobManagers; i++) {
+ jobManagerRpcServices[i] = commonRpcService;
+ }
+ for (int i = 0; i < numTaskManagers; i++) {
+ taskManagerRpcServices[i] = commonRpcService;
+ }
+ }
+ else {
+ // start a new service per component, possibly with custom bind addresses
+ final String jobManagerBindAddress = config.getJobManagerBindAddress();
+ final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+
+ for (int i = 0; i < numJobManagers; i++) {
+ jobManagerRpcServices[i] = createRpcService(
+ configuration, rpcTimeout, true, jobManagerBindAddress);
+ }
+
+ for (int i = 0; i < numTaskManagers; i++) {
+ taskManagerRpcServices[i] = createRpcService(
+ configuration, rpcTimeout, true, taskManagerBindAddress);
+ }
+
+ this.jobManagerRpcServices = jobManagerRpcServices;
+ this.taskManagerRpcServices = taskManagerRpcServices;
+ }
+
+ // create the high-availability services
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+
+ // bring up the dispatcher that launches JobManagers when jobs submitted
+ jobDispatcher = new MiniClusterJobDispatcher(
+ configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+ }
+ catch (Exception e) {
+ // cleanup everything
+ try {
+ shutdownInternally();
+ } catch (Exception ee) {
+ e.addSuppressed(ee);
+ }
+ throw e;
+ }
+
+ // now officially mark this as running
+ running = true;
+ }
+ }
+
+ /**
+ * Shuts down the mini cluster, failing all currently executing jobs.
+ * The mini cluster can be started again by calling the {@link #start()} method again.
+ *
+ * <p>This method shuts down all started services and components,
+ * even if an exception occurs in the process of shutting down some component.
+ *
+ * @throws Exception Thrown, if the shutdown did not complete cleanly.
+ */
+ public void shutdown() throws Exception {
+ synchronized (lock) {
+ if (running) {
+ try {
+ shutdownInternally();
+ } finally {
+ running = false;
+ }
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void shutdownInternally() throws Exception {
+ // this should always be called under the lock
+ assert Thread.holdsLock(lock);
+
+ // collect the first exception, but continue and add all successive
+ // exceptions as suppressed
+ Throwable exception = null;
+
+ // cancel all jobs and shut down the job dispatcher
+ if (jobDispatcher != null) {
+ try {
+ jobDispatcher.shutdown();
+ } catch (Exception e) {
+ exception = firstOrSuppressed(e, exception);
+ }
+ jobDispatcher = null;
+ }
+
+ // shut down high-availability services
+ if (haServices != null) {
+ try {
+ haServices.shutdown();
+ } catch (Exception e) {
+ exception = firstOrSuppressed(e, exception);
+ }
+ haServices = null;
+ }
+
+ // shut down the RpcServices
+ if (commonRpcService != null) {
+ exception = shutDownRpc(commonRpcService, exception);
+ commonRpcService = null;
+ }
+ if (jobManagerRpcServices != null) {
+ for (RpcService service : jobManagerRpcServices) {
+ exception = shutDownRpc(service, exception);
+ }
+ jobManagerRpcServices = null;
+ }
+ if (taskManagerRpcServices != null) {
+ for (RpcService service : taskManagerRpcServices) {
+ exception = shutDownRpc(service, exception);
+ }
+ taskManagerRpcServices = null;
+ }
+
+ // metrics shutdown
+ if (metricRegistry != null) {
+ metricRegistry.shutdown();
+ metricRegistry = null;
+ }
+
+ // if anything went wrong, throw the first error with all the additional suppressed exceptions
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // running jobs
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method executes a job in detached mode. The method returns immediately after the job
+ * has been added to the
+ *
+ * @param job The Flink job to execute
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public void runDetached(JobGraph job) throws JobExecutionException {
+ checkNotNull(job, "job is null");
+
+ synchronized (lock) {
+ checkState(running, "mini cluster is not running");
+ jobDispatcher.runDetached(job);
+ }
+ }
+
+ /**
+ * This method runs a job in blocking mode. The method returns only after the job
+ * completed successfully, or after it failed terminally.
+ *
+ * @param job The Flink job to execute
+ * @return The result of the job execution
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ checkNotNull(job, "job is null");
+
+ MiniClusterJobDispatcher dispatcher;
+ synchronized (lock) {
+ checkState(running, "mini cluster is not running");
+ dispatcher = this.jobDispatcher;
+ }
+
+ return dispatcher.runJobBlocking(job);
+ }
+
+ // ------------------------------------------------------------------------
+ // factories - can be overridden by subclasses to alter behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory method to create the metric registry for the mini cluster
+ *
+ * @param config The configuration of the mini cluster
+ */
+ protected MetricRegistry createMetricRegistry(Configuration config) {
+ return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ }
+
+ /**
+ * Factory method to instantiate the RPC service.
+ *
+ * @param config
+ * The configuration of the mini cluster
+ * @param askTimeout
+ * The default RPC timeout for asynchronous "ask" requests.
+ * @param remoteEnabled
+ * True, if the RPC service should be reachable from other (remote) RPC services.
+ * @param bindAddress
+ * The address to bind the RPC service to. Only relevant when "remoteEnabled" is true.
+ *
+ * @return The instantiated RPC service
+ */
+ protected RpcService createRpcService(
+ Configuration config,
+ Time askTimeout,
+ boolean remoteEnabled,
+ String bindAddress) {
+
+ ActorSystem actorSystem;
+ if (remoteEnabled) {
+ Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
+ actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+ } else {
+ actorSystem = AkkaUtils.createLocalActorSystem(config);
+ }
+
+ return new AkkaRpcService(actorSystem, askTimeout);
+ }
+
+ // ------------------------------------------------------------------------
+ // miscellaneous utilities
+ // ------------------------------------------------------------------------
+
+ private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
+ try {
+ if (rpcService != null) {
+ rpcService.stopService();
+ }
+ return priorException;
+ }
+ catch (Throwable t) {
+ return firstOrSuppressed(t, priorException);
+ }
+ }
+
+ private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+ MiniClusterConfiguration config = cfg == null ?
+ new MiniClusterConfiguration() :
+ new MiniClusterConfiguration(cfg);
+
+ if (!singleActorSystem) {
+ config.setUseRpcServicePerComponent();
+ }
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
new file mode 100644
index 0000000..a8d7b10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MiniClusterConfiguration {
+
+ private final Configuration config;
+
+ private boolean singleRpcService = true;
+
+ private int numJobManagers = 1;
+
+ private int numTaskManagers = 1;
+
+ private String commonBindAddress;
+
+ // ------------------------------------------------------------------------
+ // Construction
+ // ------------------------------------------------------------------------
+
+ public MiniClusterConfiguration() {
+ this.config = new Configuration();
+ }
+
+ public MiniClusterConfiguration(Configuration config) {
+ checkNotNull(config);
+ this.config = new Configuration(config);
+ }
+
+ // ------------------------------------------------------------------------
+ // setters
+ // ------------------------------------------------------------------------
+
+ public void addConfiguration(Configuration config) {
+ checkNotNull(config, "configuration must not be null");
+ this.config.addAll(config);
+ }
+
+ public void setUseSingleRpcService() {
+ this.singleRpcService = true;
+ }
+
+ public void setUseRpcServicePerComponent() {
+ this.singleRpcService = false;
+ }
+
+ public void setNumJobManagers(int numJobManagers) {
+ checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+ this.numJobManagers = numJobManagers;
+ }
+
+ public void setNumTaskManagers(int numTaskManagers) {
+ checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
+ this.numTaskManagers = numTaskManagers;
+ }
+
+ public void setNumTaskManagerSlots(int numTaskSlots) {
+ checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
+ this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
+ }
+
+ public void setCommonRpcBindAddress(String bindAddress) {
+ checkNotNull(bindAddress, "bind address must not be null");
+ this.commonBindAddress = bindAddress;
+ }
+
+ // ------------------------------------------------------------------------
+ // getters
+ // ------------------------------------------------------------------------
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public boolean getUseSingleRpcSystem() {
+ return singleRpcService;
+ }
+
+ public int getNumJobManagers() {
+ return numJobManagers;
+ }
+
+ public int getNumTaskManagers() {
+ return numTaskManagers;
+ }
+
+ public int getNumSlotsPerTaskManager() {
+ return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ }
+
+ public String getJobManagerBindAddress() {
+ return commonBindAddress != null ?
+ commonBindAddress :
+ config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ }
+
+ public String getTaskManagerBindAddress() {
+ return commonBindAddress != null ?
+ commonBindAddress :
+ config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+ }
+
+ public Time getRpcTimeout() {
+ FiniteDuration duration = AkkaUtils.getTimeout(config);
+ return Time.of(duration.length(), duration.unit());
+ }
+
+ // ------------------------------------------------------------------------
+ // utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "MiniClusterConfiguration{" +
+ "singleRpcService=" + singleRpcService +
+ ", numJobManagers=" + numJobManagers +
+ ", numTaskManagers=" + numTaskManagers +
+ ", commonBindAddress='" + commonBindAddress + '\'' +
+ ", config=" + config +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..d99eff6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** lock to ensure that this dispatcher executes only one job at a time */
+ private final Object lock = new Object();
+
+ /** the configuration with which the mini cluster was started */
+ private final Configuration configuration;
+
+ /** the RPC services to use by the job managers */
+ private final RpcService[] rpcServices;
+
+ /** services for discovery, leader election, and recovery */
+ private final HighAvailabilityServices haServices;
+
+ /** al the services that the JobManager needs, such as BLOB service, factories, etc */
+ private final JobManagerServices jobManagerServices;
+
+ /** Registry for all metrics in the mini cluster */
+ private final MetricRegistry metricRegistry;
+
+ /** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+ private final int numJobManagers;
+
+ /** The runner for the job and master. non-null if a job is currently running */
+ private volatile JobManagerRunner[] runners;
+
+ /** flag marking the dispatcher as hut down */
+ private volatile boolean shutdown;
+
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+ * non-highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ RpcService rpcService,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry) throws Exception {
+ this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+ }
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+ * a highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ * @param numJobManagers The number of JobMasters to start for each job.
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ int numJobManagers,
+ RpcService[] rpcServices) throws Exception {
+
+ checkArgument(numJobManagers >= 1);
+ checkArgument(rpcServices.length == numJobManagers);
+
+ this.configuration = checkNotNull(config);
+ this.rpcServices = rpcServices;
+ this.haServices = checkNotNull(haServices);
+ this.metricRegistry = checkNotNull(metricRegistry);
+ this.numJobManagers = numJobManagers;
+
+ LOG.info("Creating JobMaster services");
+ this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+ * terminally failed.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ if (!shutdown) {
+ shutdown = true;
+
+ LOG.info("Shutting down the dispatcher");
+
+ // in this shutdown code we copy the references to the stack first,
+ // to avoid concurrent modification
+
+ JobManagerRunner[] runners = this.runners;
+ if (runners != null) {
+ this.runners = null;
+
+ for (JobManagerRunner runner : runners) {
+ runner.shutdown();
+ }
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // submitting jobs
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method executes a job in detached mode. The method returns immediately after the job
+ * has been added to the
+ *
+ * @param job The Flink job to execute
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public void runDetached(JobGraph job) throws JobExecutionException {
+ checkNotNull(job);
+
+ LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
+
+ this.runners = startJobRunners(job, finalizer, finalizer);
+ }
+ }
+
+ /**
+ * This method runs a job in blocking mode. The method returns only after the job
+ * completed successfully, or after it failed terminally.
+ *
+ * @param job The Flink job to execute
+ * @return The result of the job execution
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ checkNotNull(job);
+
+ LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+ final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ this.runners = startJobRunners(job, sync, sync);
+ }
+
+ try {
+ return sync.getResult();
+ }
+ finally {
+ // always clear the status for the next job
+ runners = null;
+ }
+ }
+
+ private JobManagerRunner[] startJobRunners(
+ JobGraph job,
+ OnCompletionActions onCompletion,
+ FatalErrorHandler errorHandler) throws JobExecutionException {
+
+ LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+ // we first need to mark the job as running in the HA services, so that the
+ // JobManager leader will recognize that it as work to do
+ try {
+ haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(job.getJobID(),
+ "Could not register the job at the high-availability services", t);
+ }
+
+ // start all JobManagers
+ JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+ for (int i = 0; i < numJobManagers; i++) {
+ try {
+ runners[i] = new JobManagerRunner(job, configuration,
+ rpcServices[i], haServices, jobManagerServices, metricRegistry,
+ onCompletion, errorHandler);
+ runners[i].start();
+ }
+ catch (Throwable t) {
+ // shut down all the ones so far
+ for (int k = 0; k <= i; k++) {
+ try {
+ if (runners[i] != null) {
+ runners[i].shutdown();
+ }
+ } catch (Throwable ignored) {
+ // silent shutdown
+ }
+ }
+
+ // un-register the job from the high.availability services
+ try {
+ haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
+ }
+ catch (Throwable tt) {
+ LOG.warn("Could not properly unregister job from high-availability services", tt);
+ }
+
+ throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+ }
+ }
+
+ return runners;
+ }
+
+ // ------------------------------------------------------------------------
+ // test methods to simulate job master failures
+ // ------------------------------------------------------------------------
+
+// public void killJobMaster(int which) {
+// checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+// checkState(!shutdown, "mini cluster is shut down");
+//
+// JobManagerRunner[] runners = this.runners;
+// checkState(runners != null, "mini cluster it not executing a job right now");
+//
+// runners[which].shutdown(new Throwable("kill JobManager"));
+// }
+
+ // ------------------------------------------------------------------------
+ // utility classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Simple class that waits for all runners to have reported that they are done.
+ * In the case of a high-availability test setup, there may be multiple runners.
+ * After that, it marks the mini cluster as ready to receive new jobs.
+ */
+ private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
+
+ private final AtomicInteger numJobManagersToWaitFor;
+
+ private DetachedFinalizer(int numJobManagersToWaitFor) {
+ this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ decrementCheckAndCleanup();
+ }
+
+ private void decrementCheckAndCleanup() {
+ if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+ MiniClusterJobDispatcher.this.runners = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This class is used to sync on blocking jobs across multiple runners.
+ * Only after all runners reported back that they are finished, the
+ * result will be released.
+ *
+ * That way it is guaranteed that after the blocking job submit call returns,
+ * the dispatcher is immediately free to accept another job.
+ */
+ private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
+
+ private final JobID jobId;
+
+ private final CountDownLatch jobMastersToWaitFor;
+
+ private volatile Throwable jobException;
+
+ private volatile Throwable runnerException;
+
+ private volatile JobExecutionResult result;
+
+ BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+ this.jobId = jobId;
+ this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult jobResult) {
+ this.result = jobResult;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ jobException = cause;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ this.jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ if (runnerException == null) {
+ runnerException = exception;
+ }
+ }
+
+ public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+ jobMastersToWaitFor.await();
+
+ final Throwable jobFailureCause = this.jobException;
+ final Throwable runnerException = this.runnerException;
+ final JobExecutionResult result = this.result;
+
+ // (1) we check if the job terminated with an exception
+ // (2) we check whether the job completed successfully
+ // (3) we check if we have exceptions from the JobManagers. the job may still have
+ // completed successfully in that case, if multiple JobMasters were running
+ // and other took over. only if all encounter a fatal error, the job cannot finish
+
+ if (jobFailureCause != null) {
+ if (jobFailureCause instanceof JobExecutionException) {
+ throw (JobExecutionException) jobFailureCause;
+ }
+ else {
+ throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+ }
+ }
+ else if (result != null) {
+ return result;
+ }
+ else if (runnerException != null) {
+ throw new JobExecutionException(jobId,
+ "The job execution failed because all JobManagers encountered fatal errors", runnerException);
+ }
+ else {
+ throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6f6d525..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
} else {
try {
LeaderRetrievalService jobMasterLeaderRetriever =
- highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
+ highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 9e71349..e7f52e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -191,8 +191,7 @@ public class JobLeaderService {
LOG.info("Add job {} for job leader monitoring.", jobId);
final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
- jobId,
- defaultTargetAddress);
+ jobId);
JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3e88e8c..877812b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
return service;
http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
new file mode 100644
index 0000000..dd43337
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Integration test cases for the {@link MiniCluster}.
+ */
+public class MiniClusterITCase extends TestLogger {
+
+// @Test
+ public void runJobWithSingleRpcService() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+
+ // should be the default, but set anyways to make sure the test
+ // stays valid when the default changes
+ cfg.setUseSingleRpcService();
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+// @Test
+ public void runJobWithMultipleRpcServices() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+ cfg.setUseRpcServicePerComponent();
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+// @Test
+ public void runJobWithMultipleJobManagers() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+ cfg.setNumJobManagers(3);
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+ private static void executeJob(MiniCluster miniCluster) throws Exception {
+ miniCluster.start();
+
+ JobGraph job = getSimpleJob();
+ miniCluster.runJobBlocking(job);
+ }
+
+ private static JobGraph getSimpleJob() {
+ JobVertex task = new JobVertex("Test task");
+ task.setParallelism(1);
+ task.setMaxParallelism(1);
+ task.setInvokableClass(NoOpInvokable.class);
+
+ return new JobGraph(new JobID(), "Test Job", task);
+ }
+}
[5/5] flink git commit: [FLINK-4351] [cluster management] JobManager
handle TaskManager's registration
Posted by se...@apache.org.
[FLINK-4351] [cluster management] JobManager handle TaskManager's registration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b
Branch: refs/heads/flip-6
Commit: a19cae3b07963776c07c0aae7bee806004f59429
Parents: e91b82d
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMaster.java | 75 +++++++++++++++++---
.../runtime/jobmaster/JobMasterGateway.java | 15 ++--
.../runtime/taskexecutor/JobLeaderService.java | 55 ++++++--------
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../taskexecutor/TaskManagerServices.java | 2 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 10 ++-
6 files changed, 102 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 05c20d3..8cb9946 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Configuration of the JobManager */
private final Configuration configuration;
+ private final Time rpcTimeout;
+
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
@@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private volatile UUID leaderSessionID;
- // --------- resource manager --------
+ // --------- ResourceManager --------
/** Leader retriever service used to locate ResourceManager's address */
private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
private ResourceManagerConnection resourceManagerConnection;
+ // --------- TaskManagers --------
+
+ private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
// ------------------------------------------------------------------------
@@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.jobGraph = checkNotNull(jobGraph);
this.configuration = checkNotNull(configuration);
+ this.rpcTimeout = rpcAskTimeout;
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executionContext = checkNotNull(executorService);
@@ -243,6 +256,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.slotPool = new SlotPool(executorService);
this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+
+ this.registeredTaskManagers = new HashMap<>(4);
}
//----------------------------------------------------------------------------------------------
@@ -379,8 +394,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
closeResourceManagerConnection();
- // TODO: disconnect from all registered task managers
-
+ for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
+ slotPool.releaseResource(taskManagerId);
+ }
+ registeredTaskManagers.clear();
}
//----------------------------------------------------------------------------------------------
@@ -662,11 +679,53 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public RegistrationResponse registerTaskManager(
- final String taskManagerAddress,
- final ResourceID taskManagerProcessId,
- final UUID leaderId) {
- throw new UnsupportedOperationException("Has to be implemented.");
+ public Future<RegistrationResponse> registerTaskManager(
+ final TaskManagerLocation taskManagerLocation,
+ final UUID leaderId) throws Exception
+ {
+ if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+ "leader session ID {} did not equal the received leader session ID {}.",
+ taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ JobMaster.this.leaderSessionID, leaderId);
+ throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
+ + ", actual: " + leaderId);
+ }
+
+ final ResourceID taskManagerId = taskManagerLocation.getResourceID();
+
+ if (registeredTaskManagers.containsKey(taskManagerId)) {
+ final RegistrationResponse response = new JMTMRegistrationSuccess(
+ taskManagerId, libraryCacheManager.getBlobServerPort());
+ return FlinkCompletableFuture.completed(response);
+ } else {
+ return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+ @Override
+ public TaskExecutorGateway call() throws Exception {
+ return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+ .get(rpcTimeout.getSize(), rpcTimeout.getUnit());
+ }
+ }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+ @Override
+ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ }
+
+ if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+ "leader session ID {} did not equal the received leader session ID {}.",
+ taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ JobMaster.this.leaderSessionID, leaderId);
+ return new RegistrationResponse.Decline("Invalid leader session id");
+ }
+
+ slotPool.registerResource(taskManagerId);
+ registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+ return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+ }
+ }, getMainThreadExecutor());
+ }
}
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0f155a4..4c85839 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.UUID;
@@ -195,15 +196,13 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
/**
* Register the task manager at the job manager.
*
- * @param taskManagerAddress address of the task manager
- * @param taskManagerProcessId identifying the task manager
- * @param leaderId identifying the job leader
- * @param timeout for the rpc call
+ * @param taskManagerLocation location of the task manager
+ * @param leaderId identifying the job leader
+ * @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or not
*/
Future<RegistrationResponse> registerTaskManager(
- final String taskManagerAddress,
- final ResourceID taskManagerProcessId,
- final UUID leaderId,
- @RpcTimeout final Time timeout);
+ final TaskManagerLocation taskManagerLocation,
+ final UUID leaderId,
+ @RpcTimeout final Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index e7f52e2..14d36ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +53,8 @@ public class JobLeaderService {
private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
- /** Process id of the owning process */
- private final ResourceID ownerProcessId;
+ /** Self's location, used for the job manager connection */
+ private final TaskManagerLocation ownLocation;
/** The leader retrieval service and listener for each registered job */
private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
@@ -62,9 +62,6 @@ public class JobLeaderService {
/** Internal state of the service */
private volatile JobLeaderService.State state;
- /** Address of the owner of this service. This address is used for the job manager connection */
- private String ownerAddress;
-
/** Rpc service to use for establishing connections */
private RpcService rpcService;
@@ -74,14 +71,13 @@ public class JobLeaderService {
/** Job leader listener listening for job leader changes */
private JobLeaderListener jobLeaderListener;
- public JobLeaderService(ResourceID ownerProcessId) {
- this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId);
+ public JobLeaderService(TaskManagerLocation location) {
+ this.ownLocation = Preconditions.checkNotNull(location);
jobLeaderServices = new HashMap<>(4);
state = JobLeaderService.State.CREATED;
- ownerAddress = null;
rpcService = null;
highAvailabilityServices = null;
jobLeaderListener = null;
@@ -94,13 +90,11 @@ public class JobLeaderService {
/**
* Start the job leader service with the given services.
*
- * @param initialOwnerAddress to be used for establishing connections (source address)
* @param initialRpcService to be used to create rpc connections
* @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
* @param initialJobLeaderListener listening for job leader changes
*/
public void start(
- final String initialOwnerAddress,
final RpcService initialRpcService,
final HighAvailabilityServices initialHighAvailabilityServices,
final JobLeaderListener initialJobLeaderListener) {
@@ -110,7 +104,6 @@ public class JobLeaderService {
} else {
LOG.info("Start job leader service.");
- this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
this.rpcService = Preconditions.checkNotNull(initialRpcService);
this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -311,14 +304,13 @@ public class JobLeaderService {
@Override
protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
return new JobLeaderService.JobManagerRetryingRegistration(
- LOG,
- rpcService,
- "JobManager",
- JobMasterGateway.class,
- getTargetAddress(),
- getTargetLeaderId(),
- ownerAddress,
- ownerProcessId);
+ LOG,
+ rpcService,
+ "JobManager",
+ JobMasterGateway.class,
+ getTargetAddress(),
+ getTargetLeaderId(),
+ ownLocation);
}
@Override
@@ -349,10 +341,11 @@ public class JobLeaderService {
/**
* Retrying registration for the job manager <--> task manager connection.
*/
- private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+ private static final class JobManagerRetryingRegistration
+ extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
+ {
- private final String taskManagerAddress;
- private final ResourceID taskManagerProcessId;
+ private final TaskManagerLocation taskManagerLocation;
JobManagerRetryingRegistration(
Logger log,
@@ -361,22 +354,18 @@ public class JobLeaderService {
Class<JobMasterGateway> targetType,
String targetAddress,
UUID leaderId,
- String taskManagerAddress,
- ResourceID taskManagerProcessId) {
+ TaskManagerLocation taskManagerLocation) {
super(log, rpcService, targetName, targetType, targetAddress, leaderId);
- this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
- this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId);
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
}
@Override
- protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
- return gateway.registerTaskManager(
- taskManagerAddress,
- taskManagerProcessId,
- leaderId,
- Time.milliseconds(timeoutMillis));
+ protected Future<RegistrationResponse> invokeRegistration(
+ JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
+ {
+ return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3e3a544..1201281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -206,7 +206,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
taskSlotTable.start(new SlotActionsImpl());
// start the job leader service
- jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+ jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7575ba3..e8de1b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -207,7 +207,7 @@ public class TaskManagerServices {
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceID);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
return new TaskManagerServices(
taskManagerLocation,
http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 23c6833..2220f12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -350,7 +350,7 @@ public class TaskExecutorTest extends TestLogger {
final TimerService<AllocationID> timerService = mock(TimerService.class);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
@@ -379,8 +379,7 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- any(String.class),
- eq(resourceId),
+ eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
@@ -451,7 +450,7 @@ public class TaskExecutorTest extends TestLogger {
final TimerService<AllocationID> timerService = mock(TimerService.class);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final String resourceManagerAddress = "rm";
@@ -484,8 +483,7 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- any(String.class),
- eq(resourceId),
+ eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
[4/5] flink git commit: [FLINK-4689] [cluster management] Implement a
simple slot provider for the new job manager
Posted by se...@apache.org.
[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3
Branch: refs/heads/flip-6
Commit: e91b82d3c868e18611064f905b345906f1414f84
Parents: 655722a
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 1 -
.../jobmanager/slots/PooledSlotProvider.java | 73 ++++++++++++++++++++
.../flink/runtime/jobmaster/JobMaster.java | 24 ++++---
3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
internalAllocateSlot(jobID, allocationID, resourceProfile, future);
- final SlotOwner owner = this;
return future.thenApplyAsync(
new ApplyFunction<SlotDescriptor, SimpleSlot>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+ /** The pool which holds all the slots. */
+ private final SlotPool slotPool;
+
+ /** The timeout for allocation. */
+ private final Time timeout;
+
+ public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+ this.slotPool = slotPool;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+ boolean allowQueued) throws NoResourceAvailableException
+ {
+ checkNotNull(task);
+
+ final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
+ final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+ try {
+ final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
+ return FlinkCompletableFuture.completed(slot);
+ } catch (InterruptedException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
+ } catch (ExecutionException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
+ "during allocation, " + e.getMessage());
+ } catch (TimeoutException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a7be476..05c20d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
-
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** The execution graph of this job */
private final ExecutionGraph executionGraph;
+ private final SlotPool slotPool;
+
+ private final Time allocationTimeout;
private volatile UUID leaderSessionID;
@@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
private ResourceManagerConnection resourceManagerConnection;
- // TODO - we need to replace this with the slot pool
- private final Scheduler scheduler;
// ------------------------------------------------------------------------
@@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-1,
log);
- // TODO - temp fix
- this.scheduler = new Scheduler(executorService);
+ this.slotPool = new SlotPool(executorService);
+ this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
}
//----------------------------------------------------------------------------------------------
@@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
super.start();
+ slotPool.setJobManagerLeaderId(leaderSessionID);
log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
getSelf().startJobExecution();
} else {
@@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void run() {
try {
- executionGraph.scheduleForExecution(scheduler);
+ executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
} catch (Throwable t) {
executionGraph.fail(t);
}
@@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
((StartStoppable) getSelf()).stop();
leaderSessionID = null;
+ slotPool.setJobManagerLeaderId(null);
executionGraph.suspend(cause);
// disconnect from resource manager:
@@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
// verify the response with current connection
if (resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+ && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+ {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
success.getResourceManagerLeaderId());
+ slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+ resourceManagerConnection.getTargetGateway());
}
}
});
@@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+ slotPool.disconnectResourceManager();
}
//----------------------------------------------------------------------------------------------