You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:30 UTC
[50/50] [abbrv] flink git commit: [FLINK-4836] [cluster management]
Start ResourceManager in MiniCluster
[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7041f934
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7041f934
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7041f934
Branch: refs/heads/flip-6
Commit: 7041f93442edff3dd7a525a3148ca8bddd6bfd42
Parents: b9cebaf
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 17:02:33 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Oct 21 14:18:00 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 126 +++++++++++++++----
.../minicluster/MiniClusterJobDispatcher.java | 4 +-
.../flink/runtime/util/ZooKeeperUtils.java | 10 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 4 +-
.../runtime/taskmanager/TaskAsyncCallTest.java | 6 -
5 files changed, 108 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7041f934/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ffcd12..d63f9a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,11 +32,18 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.concurrent.GuardedBy;
import java.util.UUID;
@@ -48,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState;
public class MiniCluster {
+ private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
+
/** The lock to guard startup / shutdown / manipulation methods */
private final Object lock = new Object();
/** The configuration for this mini cluster */
private final MiniClusterConfiguration config;
- @GuardedBy("lock")
+ @GuardedBy("lock")
private MetricRegistry metricRegistry;
@GuardedBy("lock")
@@ -73,6 +82,9 @@ public class MiniCluster {
private HighAvailabilityServices haServices;
@GuardedBy("lock")
+ private ResourceManager<?>[] resourceManagers;
+
+ @GuardedBy("lock")
private TaskManagerRunner[] taskManagerRunners;
@GuardedBy("lock")
@@ -98,6 +110,7 @@ public class MiniCluster {
}
/**
+ * Creates a new Flink mini cluster based on the given configuration.
*
* @param config The configuration for the mini cluster
*/
@@ -149,6 +162,9 @@ public class MiniCluster {
synchronized (lock) {
checkState(!running, "FlinkMiniCluster is already running");
+ LOG.info("Starting Flink Mini Cluster");
+ LOG.debug("Using configuration {}", config);
+
final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
final Time rpcTimeout = config.getRpcTimeout();
final int numJobManagers = config.getNumJobManagers();
@@ -210,13 +226,21 @@ public class MiniCluster {
}
// create the high-availability services
+ LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
- // bring up the task managers for the mini cluster
+ // bring up the ResourceManager(s)
+ LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
+ resourceManagers = startResourceManagers(
+ configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+
+ // bring up the TaskManager(s) for the mini cluster
+ LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagerRunners = startTaskManagers(
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
// bring up the dispatcher that launches JobManagers when jobs submitted
+ LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
}
@@ -232,6 +256,8 @@ public class MiniCluster {
// now officially mark this as running
running = true;
+
+ LOG.info("Flink Mini Cluster started successfully");
}
}
@@ -247,11 +273,13 @@ public class MiniCluster {
public void shutdown() throws Exception {
synchronized (lock) {
if (running) {
+ LOG.info("Shutting down Flink Mini Cluster");
try {
shutdownInternally();
} finally {
running = false;
}
+ LOG.info("Flink Mini Cluster is shut down");
}
}
}
@@ -270,11 +298,34 @@ public class MiniCluster {
try {
jobDispatcher.shutdown();
} catch (Exception e) {
- exception = firstOrSuppressed(e, exception);
+ exception = e;
}
jobDispatcher = null;
}
+ if (resourceManagers != null) {
+ for (ResourceManager<?> rm : resourceManagers) {
+ if (rm != null) {
+ try {
+ rm.shutDown();
+ } catch (Throwable t) {
+ exception = firstOrSuppressed(t, exception);
+ }
+ }
+ }
+ resourceManagers = null;
+ }
+
+ // shut down the RpcServices
+ exception = shutDownRpc(commonRpcService, exception);
+ exception = shutDownRpcs(jobManagerRpcServices, exception);
+ exception = shutDownRpcs(taskManagerRpcServices, exception);
+ exception = shutDownRpcs(resourceManagerRpcServices, exception);
+ commonRpcService = null;
+ jobManagerRpcServices = null;
+ taskManagerRpcServices = null;
+ resourceManagerRpcServices = null;
+
// shut down high-availability services
if (haServices != null) {
try {
@@ -285,24 +336,6 @@ public class MiniCluster {
haServices = null;
}
- // shut down the RpcServices
- if (commonRpcService != null) {
- exception = shutDownRpc(commonRpcService, exception);
- commonRpcService = null;
- }
- if (jobManagerRpcServices != null) {
- for (RpcService service : jobManagerRpcServices) {
- exception = shutDownRpc(service, exception);
- }
- jobManagerRpcServices = null;
- }
- if (taskManagerRpcServices != null) {
- for (RpcService service : taskManagerRpcServices) {
- exception = shutDownRpc(service, exception);
- }
- taskManagerRpcServices = null;
- }
-
// metrics shutdown
if (metricRegistry != null) {
metricRegistry.shutdown();
@@ -402,6 +435,28 @@ public class MiniCluster {
return new AkkaRpcService(actorSystem, askTimeout);
}
+ protected ResourceManager<?>[] startResourceManagers(
+ Configuration configuration,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ int numResourceManagers,
+ RpcService[] resourceManagerRpcServices) throws Exception {
+
+ final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers];
+ final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagers[i] = new StandaloneResourceManager(
+ resourceManagerRpcServices[i],
+ haServices,
+ slotManagerFactory);
+
+ resourceManagers[i].start();
+ }
+
+ return resourceManagers;
+ }
+
protected TaskManagerRunner[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
@@ -429,15 +484,34 @@ public class MiniCluster {
// ------------------------------------------------------------------------
private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
- try {
- if (rpcService != null) {
+ if (rpcService != null) {
+ try {
rpcService.stopService();
}
- return priorException;
+ catch (Throwable t) {
+ return firstOrSuppressed(t, priorException);
+ }
}
- catch (Throwable t) {
- return firstOrSuppressed(t, priorException);
+
+ return priorException;
+ }
+
+ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
+ if (rpcServices != null) {
+ Throwable exception = priorException;
+
+ for (RpcService service : rpcServices) {
+ try {
+ if (service != null) {
+ service.stopService();
+ }
+ }
+ catch (Throwable t) {
+ exception = firstOrSuppressed(t, exception);
+ }
+ }
}
+ return priorException;
}
private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7041f934/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d0df293..8ac8eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher {
public void runDetached(JobGraph job) throws JobExecutionException {
checkNotNull(job);
- LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+ LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID());
synchronized (lock) {
checkState(!shutdown, "mini cluster is shut down");
@@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher {
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job);
- LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+ LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
synchronized (lock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7041f934/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index e9777a3..cb5dc31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -161,7 +161,6 @@ public class ZooKeeperUtils {
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
Configuration configuration) {
- {
final CuratorFramework client = startCuratorFramework(configuration);
return createLeaderRetrievalService(client, configuration);
}
@@ -172,11 +171,10 @@ public class ZooKeeperUtils {
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object containing the configuration values
* @return {@link ZooKeeperLeaderRetrievalService} instance.
- * @throws Exception
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client,
- final Configuration configuration) throws Exception
+ final Configuration configuration)
{
return createLeaderRetrievalService(client, configuration, "");
}
@@ -188,12 +186,11 @@ public class ZooKeeperUtils {
* @param configuration {@link Configuration} object containing the configuration values
* @param pathSuffix The path suffix which we want to append
* @return {@link ZooKeeperLeaderRetrievalService} instance.
- * @throws Exception
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client,
final Configuration configuration,
- final String pathSuffix) throws Exception
+ final String pathSuffix)
{
String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
@@ -240,12 +237,11 @@ public class ZooKeeperUtils {
* @param configuration {@link Configuration} object containing the configuration values
* @param pathSuffix The path suffix which we want to append
* @return {@link ZooKeeperLeaderElectionService} instance.
- * @throws Exception
*/
public static ZooKeeperLeaderElectionService createLeaderElectionService(
final CuratorFramework client,
final Configuration configuration,
- final String pathSuffix) throws Exception
+ final String pathSuffix)
{
final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/7041f934/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 260b4d4..3c45ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -168,6 +168,8 @@ public class JobSubmitTest {
JobVertex jobVertex = new JobVertex("Vertex that fails in initializeOnMaster") {
+ private static final long serialVersionUID = -3540303593784587652L;
+
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
throw new RuntimeException("test exception");
@@ -217,7 +219,7 @@ public class JobSubmitTest {
private JobGraph createSimpleJobGraph() {
JobVertex jobVertex = new JobVertex("Vertex");
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
JobGraph jg = new JobGraph("test job", jobVertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/7041f934/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 2a9ff61..7494d7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -45,10 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
@@ -57,9 +53,7 @@ import org.junit.Before;
import org.junit.Test;
import java.net.URL;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertFalse;