You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/17 18:16:50 UTC
[1/4] flink git commit: [hotfix] [cluster management] Remove scala
dependencies from MiniCluster.java
Repository: flink
Updated Branches:
refs/heads/flip-6 4f891a6c2 -> 6dc228fcf
[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f90e5ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f90e5ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f90e5ff2
Branch: refs/heads/flip-6
Commit: f90e5ff2deaf5eea350ca4a57d8cf77ceb4703b7
Parents: 4f891a6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 15:51:24 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 12 ++++--------
.../org/apache/flink/runtime/akka/AkkaUtils.scala | 17 +++++++++++++++++
.../runtime/minicluster/MiniClusterITCase.java | 2 +-
3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.ExceptionUtils;
-import scala.Option;
-import scala.Tuple2;
-
import javax.annotation.concurrent.GuardedBy;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
/**
* Factory method to instantiate the RPC service.
*
- * @param config
+ * @param configuration
* The configuration of the mini cluster
* @param askTimeout
* The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
* @return The instantiated RPC service
*/
protected RpcService createRpcService(
- Configuration config,
+ Configuration configuration,
Time askTimeout,
boolean remoteEnabled,
String bindAddress) {
ActorSystem actorSystem;
if (remoteEnabled) {
- Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
- actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+ actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0);
} else {
- actorSystem = AkkaUtils.createLocalActorSystem(config);
+ actorSystem = AkkaUtils.createLocalActorSystem(configuration);
}
return new AkkaRpcService(actorSystem, askTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2461340..d98d90a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -55,6 +55,23 @@ object AkkaUtils {
}
/**
+ * Creates an actor system bound to the given hostname and port.
+ *
+ * @param configuration instance containing the user provided configuration values
+ * @param hostname of the network interface to bind to
+ * @param port of to bind to
+ * @return created actor system
+ */
+ def createActorSystem(
+ configuration: Configuration,
+ hostname: String,
+ port: Int)
+ : ActorSystem = {
+
+ createActorSystem(configuration, Some((hostname, port)))
+ }
+
+ /**
* Creates an actor system. If a listening address is specified, then the actor system will listen
* on that address for messages from a remote actor system. If not, then a local actor system
* will be instantiated.
http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
-// @Test
+ @Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();
[3/4] flink git commit: [FLINK-4836] [cluster management] Start
ResourceManager and TaskManager services in MiniCluster
Posted by se...@apache.org.
[FLINK-4836] [cluster management] Start ResourceManager and TaskManager services in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c041660
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c041660
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c041660
Branch: refs/heads/flip-6
Commit: 6c041660912e99f6bf3e667cc497f7c839f1d10c
Parents: f4831c6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 15:16:59 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:02:55 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 58 +++++++++++++++++++-
.../minicluster/MiniClusterConfiguration.java | 20 ++++++-
.../minicluster/MiniClusterJobDispatcher.java | 2 +-
.../runtime/taskexecutor/TaskManagerRunner.java | 3 +-
.../runtime/minicluster/MiniClusterITCase.java | 2 +-
5 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d85234d..1ffcd12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -33,10 +34,13 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
import javax.annotation.concurrent.GuardedBy;
+import java.util.UUID;
+
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -63,9 +67,15 @@ public class MiniCluster {
private RpcService[] taskManagerRpcServices;
@GuardedBy("lock")
+ private RpcService[] resourceManagerRpcServices;
+
+ @GuardedBy("lock")
private HighAvailabilityServices haServices;
@GuardedBy("lock")
+ private TaskManagerRunner[] taskManagerRunners;
+
+ @GuardedBy("lock")
private MiniClusterJobDispatcher jobDispatcher;
/** Flag marking the mini cluster as started/running */
@@ -143,6 +153,7 @@ public class MiniCluster {
final Time rpcTimeout = config.getRpcTimeout();
final int numJobManagers = config.getNumJobManagers();
final int numTaskManagers = config.getNumTaskManagers();
+ final int numResourceManagers = config.getNumResourceManagers();
final boolean singleRpc = config.getUseSingleRpcSystem();
try {
@@ -150,6 +161,7 @@ public class MiniCluster {
RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+ RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
// bring up all the RPC services
if (singleRpc) {
@@ -163,11 +175,19 @@ public class MiniCluster {
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = commonRpcService;
}
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagerRpcServices[i] = commonRpcService;
+ }
+
+ this.resourceManagerRpcServices = null;
+ this.jobManagerRpcServices = null;
+ this.taskManagerRpcServices = null;
}
else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = config.getJobManagerBindAddress();
final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+ final String resourceManagerBindAddress = config.getResourceManagerBindAddress();
for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = createRpcService(
@@ -179,13 +199,23 @@ public class MiniCluster {
configuration, rpcTimeout, true, taskManagerBindAddress);
}
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagerRpcServices[i] = createRpcService(
+ configuration, rpcTimeout, true, resourceManagerBindAddress);
+ }
+
this.jobManagerRpcServices = jobManagerRpcServices;
this.taskManagerRpcServices = taskManagerRpcServices;
+ this.resourceManagerRpcServices = resourceManagerRpcServices;
}
// create the high-availability services
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+ // bring up the task managers for the mini cluster
+ taskManagerRunners = startTaskManagers(
+ configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
+
// bring up the dispatcher that launches JobManagers when jobs submitted
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
@@ -372,6 +402,28 @@ public class MiniCluster {
return new AkkaRpcService(actorSystem, askTimeout);
}
+ protected TaskManagerRunner[] startTaskManagers(
+ Configuration configuration,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ int numTaskManagers,
+ RpcService[] taskManagerRpcServices) throws Exception {
+
+ final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+
+ for (int i = 0; i < numTaskManagers; i++) {
+ taskManagerRunners[i] = new TaskManagerRunner(
+ configuration,
+ new ResourceID(UUID.randomUUID().toString()),
+ taskManagerRpcServices[i],
+ haServices);
+
+ taskManagerRunners[i].start();
+ }
+
+ return taskManagerRunners;
+ }
+
// ------------------------------------------------------------------------
// miscellaneous utilities
// ------------------------------------------------------------------------
@@ -388,12 +440,14 @@ public class MiniCluster {
}
}
- private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+ private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
MiniClusterConfiguration config = cfg == null ?
new MiniClusterConfiguration() :
new MiniClusterConfiguration(cfg);
- if (!singleActorSystem) {
+ if (singleRpcService) {
+ config.setUseSingleRpcService();
+ } else {
config.setUseRpcServicePerComponent();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a8d7b10..cfbbffb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -37,6 +37,8 @@ public class MiniClusterConfiguration {
private int numTaskManagers = 1;
+ private int numResourceManagers = 1;
+
private String commonBindAddress;
// ------------------------------------------------------------------------
@@ -79,6 +81,11 @@ public class MiniClusterConfiguration {
this.numTaskManagers = numTaskManagers;
}
+ public void setNumResourceManagers(int numResourceManagers) {
+ checkArgument(numResourceManagers >= 1, "must have at least one ResourceManager");
+ this.numResourceManagers = numResourceManagers;
+ }
+
public void setNumTaskManagerSlots(int numTaskSlots) {
checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
@@ -109,6 +116,10 @@ public class MiniClusterConfiguration {
return numTaskManagers;
}
+ public int getNumResourceManagers() {
+ return numResourceManagers;
+ }
+
public int getNumSlotsPerTaskManager() {
return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
}
@@ -125,6 +136,12 @@ public class MiniClusterConfiguration {
config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
}
+ public String getResourceManagerBindAddress() {
+ return commonBindAddress != null ?
+ commonBindAddress :
+ config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+ }
+
public Time getRpcTimeout() {
FiniteDuration duration = AkkaUtils.getTimeout(config);
return Time.of(duration.length(), duration.unit());
@@ -136,10 +153,11 @@ public class MiniClusterConfiguration {
@Override
public String toString() {
- return "MiniClusterConfiguration{" +
+ return "MiniClusterConfiguration {" +
"singleRpcService=" + singleRpcService +
", numJobManagers=" + numJobManagers +
", numTaskManagers=" + numTaskManagers +
+ ", numResourceManagers=" + numResourceManagers +
", commonBindAddress='" + commonBindAddress + '\'' +
", config=" + config +
'}';
http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d99eff6..d0df293 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -63,7 +63,7 @@ public class MiniClusterJobDispatcher {
/** services for discovery, leader election, and recovery */
private final HighAvailabilityServices haServices;
- /** al the services that the JobManager needs, such as BLOB service, factories, etc */
+ /** all the services that the JobManager needs, such as BLOB service, factories, etc */
private final JobManagerServices jobManagerServices;
/** Registry for all metrics in the mini cluster */
http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 9f78682..f56d17c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,8 +66,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- Executor executor) throws Exception {
+ HighAvailabilityServices highAvailabilityServices) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.resourceID = Preconditions.checkNotNull(resourceID);
http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index ef53547..dd43337 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
- @Test
+// @Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();
[4/4] flink git commit: [FLINK-4836] [cluster management] Start
ResourceManager in MiniCluster
Posted by se...@apache.org.
[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dc228fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dc228fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dc228fc
Branch: refs/heads/flip-6
Commit: 6dc228fcf5d33536176beb15338982c7bb5a2f56
Parents: 6c04166
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 17:02:33 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:36:56 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 126 +++++++++++++++----
.../minicluster/MiniClusterJobDispatcher.java | 4 +-
2 files changed, 102 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ffcd12..d63f9a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,11 +32,18 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.concurrent.GuardedBy;
import java.util.UUID;
@@ -48,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState;
public class MiniCluster {
+ private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
+
/** The lock to guard startup / shutdown / manipulation methods */
private final Object lock = new Object();
/** The configuration for this mini cluster */
private final MiniClusterConfiguration config;
- @GuardedBy("lock")
+ @GuardedBy("lock")
private MetricRegistry metricRegistry;
@GuardedBy("lock")
@@ -73,6 +82,9 @@ public class MiniCluster {
private HighAvailabilityServices haServices;
@GuardedBy("lock")
+ private ResourceManager<?>[] resourceManagers;
+
+ @GuardedBy("lock")
private TaskManagerRunner[] taskManagerRunners;
@GuardedBy("lock")
@@ -98,6 +110,7 @@ public class MiniCluster {
}
/**
+ * Creates a new Flink mini cluster based on the given configuration.
*
* @param config The configuration for the mini cluster
*/
@@ -149,6 +162,9 @@ public class MiniCluster {
synchronized (lock) {
checkState(!running, "FlinkMiniCluster is already running");
+ LOG.info("Starting Flink Mini Cluster");
+ LOG.debug("Using configuration {}", config);
+
final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
final Time rpcTimeout = config.getRpcTimeout();
final int numJobManagers = config.getNumJobManagers();
@@ -210,13 +226,21 @@ public class MiniCluster {
}
// create the high-availability services
+ LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
- // bring up the task managers for the mini cluster
+ // bring up the ResourceManager(s)
+ LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
+ resourceManagers = startResourceManagers(
+ configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+
+ // bring up the TaskManager(s) for the mini cluster
+ LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagerRunners = startTaskManagers(
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
// bring up the dispatcher that launches JobManagers when jobs submitted
+ LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
}
@@ -232,6 +256,8 @@ public class MiniCluster {
// now officially mark this as running
running = true;
+
+ LOG.info("Flink Mini Cluster started successfully");
}
}
@@ -247,11 +273,13 @@ public class MiniCluster {
public void shutdown() throws Exception {
synchronized (lock) {
if (running) {
+ LOG.info("Shutting down Flink Mini Cluster");
try {
shutdownInternally();
} finally {
running = false;
}
+ LOG.info("Flink Mini Cluster is shut down");
}
}
}
@@ -270,11 +298,34 @@ public class MiniCluster {
try {
jobDispatcher.shutdown();
} catch (Exception e) {
- exception = firstOrSuppressed(e, exception);
+ exception = e;
}
jobDispatcher = null;
}
+ if (resourceManagers != null) {
+ for (ResourceManager<?> rm : resourceManagers) {
+ if (rm != null) {
+ try {
+ rm.shutDown();
+ } catch (Throwable t) {
+ exception = firstOrSuppressed(t, exception);
+ }
+ }
+ }
+ resourceManagers = null;
+ }
+
+ // shut down the RpcServices
+ exception = shutDownRpc(commonRpcService, exception);
+ exception = shutDownRpcs(jobManagerRpcServices, exception);
+ exception = shutDownRpcs(taskManagerRpcServices, exception);
+ exception = shutDownRpcs(resourceManagerRpcServices, exception);
+ commonRpcService = null;
+ jobManagerRpcServices = null;
+ taskManagerRpcServices = null;
+ resourceManagerRpcServices = null;
+
// shut down high-availability services
if (haServices != null) {
try {
@@ -285,24 +336,6 @@ public class MiniCluster {
haServices = null;
}
- // shut down the RpcServices
- if (commonRpcService != null) {
- exception = shutDownRpc(commonRpcService, exception);
- commonRpcService = null;
- }
- if (jobManagerRpcServices != null) {
- for (RpcService service : jobManagerRpcServices) {
- exception = shutDownRpc(service, exception);
- }
- jobManagerRpcServices = null;
- }
- if (taskManagerRpcServices != null) {
- for (RpcService service : taskManagerRpcServices) {
- exception = shutDownRpc(service, exception);
- }
- taskManagerRpcServices = null;
- }
-
// metrics shutdown
if (metricRegistry != null) {
metricRegistry.shutdown();
@@ -402,6 +435,28 @@ public class MiniCluster {
return new AkkaRpcService(actorSystem, askTimeout);
}
+ protected ResourceManager<?>[] startResourceManagers(
+ Configuration configuration,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ int numResourceManagers,
+ RpcService[] resourceManagerRpcServices) throws Exception {
+
+ final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers];
+ final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagers[i] = new StandaloneResourceManager(
+ resourceManagerRpcServices[i],
+ haServices,
+ slotManagerFactory);
+
+ resourceManagers[i].start();
+ }
+
+ return resourceManagers;
+ }
+
protected TaskManagerRunner[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
@@ -429,15 +484,34 @@ public class MiniCluster {
// ------------------------------------------------------------------------
private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
- try {
- if (rpcService != null) {
+ if (rpcService != null) {
+ try {
rpcService.stopService();
}
- return priorException;
+ catch (Throwable t) {
+ return firstOrSuppressed(t, priorException);
+ }
}
- catch (Throwable t) {
- return firstOrSuppressed(t, priorException);
+
+ return priorException;
+ }
+
+ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
+ if (rpcServices != null) {
+ Throwable exception = priorException;
+
+ for (RpcService service : rpcServices) {
+ try {
+ if (service != null) {
+ service.stopService();
+ }
+ }
+ catch (Throwable t) {
+ exception = firstOrSuppressed(t, exception);
+ }
+ }
}
+ return priorException;
}
private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d0df293..8ac8eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher {
public void runDetached(JobGraph job) throws JobExecutionException {
checkNotNull(job);
- LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+ LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID());
synchronized (lock) {
checkState(!shutdown, "mini cluster is shut down");
@@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher {
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job);
- LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+ LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
synchronized (lock) {
[2/4] flink git commit: [hotfix] Add a DefaultSlotManager similar to
the TestingSlotManager
Posted by se...@apache.org.
[hotfix] Add a DefaultSlotManager similar to the TestingSlotManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4831c64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4831c64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4831c64
Branch: refs/heads/flip-6
Commit: f4831c64768a45f04a662b81f581dd5775d7111e
Parents: f90e5ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 16:38:17 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:02:55 2016 +0200
----------------------------------------------------------------------
.../ResourceManagerServices.java | 2 +-
.../slotmanager/DefaultSlotManager.java | 69 ++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f4831c64/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index 16d0a7d..c524604 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -38,7 +38,7 @@ public interface ResourceManagerServices {
void allocateResource(ResourceProfile resourceProfile);
/**
- * Gets the async excutor which executes outside of the main thread of the ResourceManager
+ * Gets the async executor which executes outside of the main thread of the ResourceManager
*/
Executor getAsyncExecutor();
http://git-wip-us.apache.org/repos/asf/flink/blob/f4831c64/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
new file mode 100644
index 0000000..9508936
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A slot manager that answers requests with slots without any special logic. The first slot
+ * in the maps to match a request is chosen.
+ */
+public class DefaultSlotManager extends SlotManager {
+
+ public DefaultSlotManager(ResourceManagerServices rmServices) {
+ super(rmServices);
+ }
+
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+ if (slotIterator.hasNext()) {
+ return slotIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+ final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+ if (requestIterator.hasNext()) {
+ return requestIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class Factory implements SlotManagerFactory {
+
+ @Override
+ public SlotManager create(ResourceManagerServices rmServices) {
+ return new DefaultSlotManager(rmServices);
+ }
+ }
+}