You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/17 18:16:50 UTC

[1/4] flink git commit: [hotfix] [cluster management] Remove scala dependencies from MiniCluster.java

Repository: flink
Updated Branches:
  refs/heads/flip-6 4f891a6c2 -> 6dc228fcf


[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f90e5ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f90e5ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f90e5ff2

Branch: refs/heads/flip-6
Commit: f90e5ff2deaf5eea350ca4a57d8cf77ceb4703b7
Parents: 4f891a6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 15:51:24 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java     | 12 ++++--------
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 17 +++++++++++++++++
 .../runtime/minicluster/MiniClusterITCase.java     |  2 +-
 3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 
-import scala.Option;
-import scala.Tuple2;
-
 import javax.annotation.concurrent.GuardedBy;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
 	/**
 	 * Factory method to instantiate the RPC service.
 	 * 
-	 * @param config
+	 * @param configuration
 	 *            The configuration of the mini cluster
 	 * @param askTimeout
 	 *            The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
 	 * @return The instantiated RPC service
 	 */
 	protected RpcService createRpcService(
-			Configuration config,
+			Configuration configuration,
 			Time askTimeout,
 			boolean remoteEnabled,
 			String bindAddress) {
 
 		ActorSystem actorSystem;
 		if (remoteEnabled) {
-			Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
-			actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+			actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0);
 		} else {
-			actorSystem = AkkaUtils.createLocalActorSystem(config);
+			actorSystem = AkkaUtils.createLocalActorSystem(configuration);
 		}
 
 		return new AkkaRpcService(actorSystem, askTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2461340..d98d90a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -55,6 +55,23 @@ object AkkaUtils {
   }
 
   /**
+    * Creates an actor system bound to the given hostname and port.
+    *
+    * @param configuration instance containing the user provided configuration values
+    * @param hostname of the network interface to bind to
+    * @param port of to bind to
+    * @return created actor system
+    */
+  def createActorSystem(
+      configuration: Configuration,
+      hostname: String,
+      port: Int)
+    : ActorSystem = {
+
+    createActorSystem(configuration, Some((hostname, port)))
+  }
+
+  /**
    * Creates an actor system. If a listening address is specified, then the actor system will listen
    * on that address for messages from a remote actor system. If not, then a local actor system
    * will be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-//	@Test
+	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 


[3/4] flink git commit: [FLINK-4836] [cluster management] Start ResourceManager and TaskManager services in MiniCluster

Posted by se...@apache.org.
[FLINK-4836] [cluster management] Start ResourceManager and TaskManager services in MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c041660
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c041660
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c041660

Branch: refs/heads/flip-6
Commit: 6c041660912e99f6bf3e667cc497f7c839f1d10c
Parents: f4831c6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 15:16:59 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:02:55 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 58 +++++++++++++++++++-
 .../minicluster/MiniClusterConfiguration.java   | 20 ++++++-
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |  3 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  2 +-
 5 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d85234d..1ffcd12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -33,10 +34,13 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.UUID;
+
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -63,9 +67,15 @@ public class MiniCluster {
 	private RpcService[] taskManagerRpcServices;
 
 	@GuardedBy("lock")
+	private RpcService[] resourceManagerRpcServices;
+
+	@GuardedBy("lock")
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private TaskManagerRunner[] taskManagerRunners;
+
+	@GuardedBy("lock")
 	private MiniClusterJobDispatcher jobDispatcher;
 
 	/** Flag marking the mini cluster as started/running */
@@ -143,6 +153,7 @@ public class MiniCluster {
 			final Time rpcTimeout = config.getRpcTimeout();
 			final int numJobManagers = config.getNumJobManagers();
 			final int numTaskManagers = config.getNumTaskManagers();
+			final int numResourceManagers = config.getNumResourceManagers();
 			final boolean singleRpc = config.getUseSingleRpcSystem();
 
 			try {
@@ -150,6 +161,7 @@ public class MiniCluster {
 
 				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
 				RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+				RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
 
 				// bring up all the RPC services
 				if (singleRpc) {
@@ -163,11 +175,19 @@ public class MiniCluster {
 					for (int i = 0; i < numTaskManagers; i++) {
 						taskManagerRpcServices[i] = commonRpcService;
 					}
+					for (int i = 0; i < numResourceManagers; i++) {
+						resourceManagerRpcServices[i] = commonRpcService;
+					}
+
+					this.resourceManagerRpcServices = null;
+					this.jobManagerRpcServices = null;
+					this.taskManagerRpcServices = null;
 				}
 				else {
 					// start a new service per component, possibly with custom bind addresses
 					final String jobManagerBindAddress = config.getJobManagerBindAddress();
 					final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+					final String resourceManagerBindAddress = config.getResourceManagerBindAddress();
 
 					for (int i = 0; i < numJobManagers; i++) {
 						jobManagerRpcServices[i] = createRpcService(
@@ -179,13 +199,23 @@ public class MiniCluster {
 								configuration, rpcTimeout, true, taskManagerBindAddress);
 					}
 
+					for (int i = 0; i < numResourceManagers; i++) {
+						resourceManagerRpcServices[i] = createRpcService(
+								configuration, rpcTimeout, true, resourceManagerBindAddress);
+					}
+
 					this.jobManagerRpcServices = jobManagerRpcServices;
 					this.taskManagerRpcServices = taskManagerRpcServices;
+					this.resourceManagerRpcServices = resourceManagerRpcServices;
 				}
 
 				// create the high-availability services
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
+				// bring up the task managers for the mini cluster
+				taskManagerRunners = startTaskManagers(
+						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
+
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				jobDispatcher = new MiniClusterJobDispatcher(
 						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
@@ -372,6 +402,28 @@ public class MiniCluster {
 		return new AkkaRpcService(actorSystem, askTimeout);
 	}
 
+	protected TaskManagerRunner[] startTaskManagers(
+			Configuration configuration,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
+			int numTaskManagers,
+			RpcService[] taskManagerRpcServices) throws Exception {
+
+		final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
+
+		for (int i = 0; i < numTaskManagers; i++) {
+			taskManagerRunners[i] = new TaskManagerRunner(
+				configuration,
+				new ResourceID(UUID.randomUUID().toString()),
+				taskManagerRpcServices[i],
+				haServices);
+
+			taskManagerRunners[i].start();
+		}
+
+		return taskManagerRunners;
+	}
+
 	// ------------------------------------------------------------------------
 	//  miscellaneous utilities
 	// ------------------------------------------------------------------------
@@ -388,12 +440,14 @@ public class MiniCluster {
 		}
 	}
 
-	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
 		MiniClusterConfiguration config = cfg == null ?
 				new MiniClusterConfiguration() :
 				new MiniClusterConfiguration(cfg);
 
-		if (!singleActorSystem) {
+		if (singleRpcService) {
+			config.setUseSingleRpcService();
+		} else {
 			config.setUseRpcServicePerComponent();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a8d7b10..cfbbffb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -37,6 +37,8 @@ public class MiniClusterConfiguration {
 
 	private int numTaskManagers = 1;
 
+	private int numResourceManagers = 1;
+
 	private String commonBindAddress;
 
 	// ------------------------------------------------------------------------
@@ -79,6 +81,11 @@ public class MiniClusterConfiguration {
 		this.numTaskManagers = numTaskManagers;
 	}
 
+	public void setNumResourceManagers(int numResourceManagers) {
+		checkArgument(numResourceManagers >= 1, "must have at least one ResourceManager");
+		this.numResourceManagers = numResourceManagers;
+	}
+
 	public void setNumTaskManagerSlots(int numTaskSlots) {
 		checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
 		this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
@@ -109,6 +116,10 @@ public class MiniClusterConfiguration {
 		return numTaskManagers;
 	}
 
+	public int getNumResourceManagers() {
+		return numResourceManagers;
+	}
+
 	public int getNumSlotsPerTaskManager() {
 		return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
 	}
@@ -125,6 +136,12 @@ public class MiniClusterConfiguration {
 				config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
 	}
 
+	public String getResourceManagerBindAddress() {
+		return commonBindAddress != null ?
+			commonBindAddress :
+			config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+	}
+
 	public Time getRpcTimeout() {
 		FiniteDuration duration = AkkaUtils.getTimeout(config);
 		return Time.of(duration.length(), duration.unit());
@@ -136,10 +153,11 @@ public class MiniClusterConfiguration {
 
 	@Override
 	public String toString() {
-		return "MiniClusterConfiguration{" +
+		return "MiniClusterConfiguration {" +
 				"singleRpcService=" + singleRpcService +
 				", numJobManagers=" + numJobManagers +
 				", numTaskManagers=" + numTaskManagers +
+				", numResourceManagers=" + numResourceManagers +
 				", commonBindAddress='" + commonBindAddress + '\'' +
 				", config=" + config +
 				'}';

http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d99eff6..d0df293 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -63,7 +63,7 @@ public class MiniClusterJobDispatcher {
 	/** services for discovery, leader election, and recovery */
 	private final HighAvailabilityServices haServices;
 
-	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
+	/** all the services that the JobManager needs, such as BLOB service, factories, etc */
 	private final JobManagerServices jobManagerServices;
 
 	/** Registry for all metrics in the mini cluster */

http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 9f78682..f56d17c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -66,8 +66,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		Configuration configuration,
 		ResourceID resourceID,
 		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		Executor executor) throws Exception {
+		HighAvailabilityServices highAvailabilityServices) throws Exception {
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.resourceID = Preconditions.checkNotNull(resourceID);

http://git-wip-us.apache.org/repos/asf/flink/blob/6c041660/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index ef53547..dd43337 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-	@Test
+//	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 


[4/4] flink git commit: [FLINK-4836] [cluster management] Start ResourceManager in MiniCluster

Posted by se...@apache.org.
[FLINK-4836] [cluster management] Start ResourceManager in MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dc228fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dc228fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dc228fc

Branch: refs/heads/flip-6
Commit: 6dc228fcf5d33536176beb15338982c7bb5a2f56
Parents: 6c04166
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 17:02:33 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:36:56 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 126 +++++++++++++++----
 .../minicluster/MiniClusterJobDispatcher.java   |   4 +-
 2 files changed, 102 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ffcd12..d63f9a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,11 +32,18 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.UUID;
@@ -48,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 public class MiniCluster {
 
+	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
+
 	/** The lock to guard startup / shutdown / manipulation methods */
 	private final Object lock = new Object();
 
 	/** The configuration for this mini cluster */
 	private final MiniClusterConfiguration config;
 
-	@GuardedBy("lock")
+	@GuardedBy("lock") 
 	private MetricRegistry metricRegistry;
 
 	@GuardedBy("lock")
@@ -73,6 +82,9 @@ public class MiniCluster {
 	private HighAvailabilityServices haServices;
 
 	@GuardedBy("lock")
+	private ResourceManager<?>[] resourceManagers;
+
+	@GuardedBy("lock")
 	private TaskManagerRunner[] taskManagerRunners;
 
 	@GuardedBy("lock")
@@ -98,6 +110,7 @@ public class MiniCluster {
 	}
 
 	/**
+	 * Creates a new Flink mini cluster based on the given configuration.
 	 * 
 	 * @param config The configuration for the mini cluster
 	 */
@@ -149,6 +162,9 @@ public class MiniCluster {
 		synchronized (lock) {
 			checkState(!running, "FlinkMiniCluster is already running");
 
+			LOG.info("Starting Flink Mini Cluster");
+			LOG.debug("Using configuration {}", config);
+
 			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
 			final Time rpcTimeout = config.getRpcTimeout();
 			final int numJobManagers = config.getNumJobManagers();
@@ -210,13 +226,21 @@ public class MiniCluster {
 				}
 
 				// create the high-availability services
+				LOG.info("Starting high-availability services");
 				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
 
-				// bring up the task managers for the mini cluster
+				// bring up the ResourceManager(s)
+				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
+				resourceManagers = startResourceManagers(
+						configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+
+				// bring up the TaskManager(s) for the mini cluster
+				LOG.info("Starting {} TaskManger(s)", numTaskManagers);
 				taskManagerRunners = startTaskManagers(
 						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
+				LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
 						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
 			}
@@ -232,6 +256,8 @@ public class MiniCluster {
 
 			// now officially mark this as running
 			running = true;
+
+			LOG.info("Flink Mini Cluster started successfully");
 		}
 	}
 
@@ -247,11 +273,13 @@ public class MiniCluster {
 	public void shutdown() throws Exception {
 		synchronized (lock) {
 			if (running) {
+				LOG.info("Shutting down Flink Mini Cluster");
 				try {
 					shutdownInternally();
 				} finally {
 					running = false;
 				}
+				LOG.info("Flink Mini Cluster is shut down");
 			}
 		}
 	}
@@ -270,11 +298,34 @@ public class MiniCluster {
 			try {
 				jobDispatcher.shutdown();
 			} catch (Exception e) {
-				exception = firstOrSuppressed(e, exception);
+				exception = e;
 			}
 			jobDispatcher = null;
 		}
 
+		if (resourceManagers != null) {
+			for (ResourceManager<?> rm : resourceManagers) {
+				if (rm != null) {
+					try {
+						rm.shutDown();
+					} catch (Throwable t) {
+						exception = firstOrSuppressed(t, exception);
+					}
+				}
+			}
+			resourceManagers = null;
+		}
+
+		// shut down the RpcServices
+		exception = shutDownRpc(commonRpcService, exception);
+		exception = shutDownRpcs(jobManagerRpcServices, exception);
+		exception = shutDownRpcs(taskManagerRpcServices, exception);
+		exception = shutDownRpcs(resourceManagerRpcServices, exception);
+		commonRpcService = null;
+		jobManagerRpcServices = null;
+		taskManagerRpcServices = null;
+		resourceManagerRpcServices = null;
+
 		// shut down high-availability services
 		if (haServices != null) {
 			try {
@@ -285,24 +336,6 @@ public class MiniCluster {
 			haServices = null;
 		}
 
-		// shut down the RpcServices
-		if (commonRpcService != null) {
-			exception = shutDownRpc(commonRpcService, exception);
-			commonRpcService = null;
-		}
-		if (jobManagerRpcServices != null) {
-			for (RpcService service : jobManagerRpcServices) {
-				exception = shutDownRpc(service, exception);
-			}
-			jobManagerRpcServices = null;
-		}
-		if (taskManagerRpcServices != null) {
-			for (RpcService service : taskManagerRpcServices) {
-				exception = shutDownRpc(service, exception);
-			}
-			taskManagerRpcServices = null;
-		}
-
 		// metrics shutdown
 		if (metricRegistry != null) {
 			metricRegistry.shutdown();
@@ -402,6 +435,28 @@ public class MiniCluster {
 		return new AkkaRpcService(actorSystem, askTimeout);
 	}
 
+	protected ResourceManager<?>[] startResourceManagers(
+			Configuration configuration,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
+			int numResourceManagers,
+			RpcService[] resourceManagerRpcServices) throws Exception {
+
+		final StandaloneResourceManager[] resourceManagers = new StandaloneResourceManager[numResourceManagers];
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); 
+
+		for (int i = 0; i < numResourceManagers; i++) {
+			resourceManagers[i] = new StandaloneResourceManager(
+					resourceManagerRpcServices[i],
+					haServices,
+					slotManagerFactory);
+
+			resourceManagers[i].start();
+		}
+
+		return resourceManagers;
+	}
+
 	protected TaskManagerRunner[] startTaskManagers(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
@@ -429,15 +484,34 @@ public class MiniCluster {
 	// ------------------------------------------------------------------------
 
 	private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
-		try {
-			if (rpcService != null) {
+		if (rpcService != null) {
+			try {
 				rpcService.stopService();
 			}
-			return priorException;
+			catch (Throwable t) {
+				return firstOrSuppressed(t, priorException);
+			}
 		}
-		catch (Throwable t) {
-			return firstOrSuppressed(t, priorException);
+
+		return priorException;
+	}
+
+	private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
+		if (rpcServices != null) {
+			Throwable exception = priorException;
+
+			for (RpcService service : rpcServices) {
+				try {
+					if (service != null) {
+						service.stopService();
+					}
+				}
+				catch (Throwable t) {
+					exception = firstOrSuppressed(t, exception);
+				}
+			}
 		}
+		return priorException;
 	}
 
 	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6dc228fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index d0df293..8ac8eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -176,7 +176,7 @@ public class MiniClusterJobDispatcher {
 	public void runDetached(JobGraph job) throws JobExecutionException {
 		checkNotNull(job);
 
-		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+		LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID());
 
 		synchronized (lock) {
 			checkState(!shutdown, "mini cluster is shut down");
@@ -201,7 +201,7 @@ public class MiniClusterJobDispatcher {
 	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job);
 		
-		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+		LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
 		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
 
 		synchronized (lock) {


[2/4] flink git commit: [hotfix] Add a DefaultSlotManager similar to the TestingSlotManager

Posted by se...@apache.org.
[hotfix] Add a DefaultSlotManager similar to the TestingSlotManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4831c64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4831c64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4831c64

Branch: refs/heads/flip-6
Commit: f4831c64768a45f04a662b81f581dd5775d7111e
Parents: f90e5ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 16:38:17 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 17:02:55 2016 +0200

----------------------------------------------------------------------
 .../ResourceManagerServices.java                |  2 +-
 .../slotmanager/DefaultSlotManager.java         | 69 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4831c64/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index 16d0a7d..c524604 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -38,7 +38,7 @@ public interface ResourceManagerServices {
 	void allocateResource(ResourceProfile resourceProfile);
 
 	/**
-	 * Gets the async excutor which executes outside of the main thread of the ResourceManager
+	 * Gets the async executor which executes outside of the main thread of the ResourceManager
 	 */
 	Executor getAsyncExecutor();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4831c64/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
new file mode 100644
index 0000000..9508936
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A slot manager that answers requests with slots without any special logic. The first slot
+ * in the maps to match a request is chosen.
+ */
+public class DefaultSlotManager extends SlotManager {
+
+	public DefaultSlotManager(ResourceManagerServices rmServices) {
+		super(rmServices);
+	}
+
+	@Override
+	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+		if (slotIterator.hasNext()) {
+			return slotIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+		if (requestIterator.hasNext()) {
+			return requestIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class Factory implements SlotManagerFactory {
+
+		@Override
+		public SlotManager create(ResourceManagerServices rmServices) {
+			return new DefaultSlotManager(rmServices);
+		}
+	}
+}