You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:30 UTC
[09/22] Rework the Taskmanager to a slot based model and remove
legacy cloud code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
deleted file mode 100644
index 56b4eae..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Set;
-
-import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.ipc.RPC;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
-import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-/**
- * An abstract instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
- *
- */
-public abstract class AbstractInstance extends NetworkNode {
-
- /**
- * The type of the instance.
- */
- private final InstanceType instanceType;
-
- /**
- * The connection info identifying the instance.
- */
- private final InstanceConnectionInfo instanceConnectionInfo;
-
- /**
- * The hardware description as reported by the instance itself.
- */
- private final HardwareDescription hardwareDescription;
-
- /**
- * Stores the RPC stub object for the instance's task manager.
- */
- private TaskOperationProtocol taskManager = null;
-
- /**
- * Constructs an abstract instance object.
- *
- * @param instanceType
- * the type of the instance
- * @param instanceConnectionInfo
- * the connection info identifying the instance
- * @param parentNode
- * the parent node in the network topology
- * @param networkTopology
- * the network topology this node is a part of
- * @param hardwareDescription
- * the hardware description provided by the instance itself
- */
- public AbstractInstance(final InstanceType instanceType, final InstanceConnectionInfo instanceConnectionInfo,
- final NetworkNode parentNode, final NetworkTopology networkTopology,
- final HardwareDescription hardwareDescription) {
- super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
- this.instanceType = instanceType;
- this.instanceConnectionInfo = instanceConnectionInfo;
- this.hardwareDescription = hardwareDescription;
- }
-
- /**
- * Creates or returns the RPC stub object for the instance's task manager.
- *
- * @return the RPC stub object for the instance's task manager
- * @throws IOException
- * thrown if the RPC stub object for the task manager cannot be created
- */
- private TaskOperationProtocol getTaskManagerProxy() throws IOException {
-
- if (this.taskManager == null) {
-
- this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
- new InetSocketAddress(getInstanceConnectionInfo().address(),
- getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
- }
-
- return this.taskManager;
- }
-
- /**
- * Destroys and removes the RPC stub object for this instance's task manager.
- */
- private void destroyTaskManagerProxy() {
-
- if (this.taskManager != null) {
- RPC.stopProxy(this.taskManager);
- this.taskManager = null;
- }
- }
-
- /**
- * Returns the type of the instance.
- *
- * @return the type of the instance
- */
- public final InstanceType getType() {
- return this.instanceType;
- }
-
- /**
- * Returns the instance's connection information object.
- *
- * @return the instance's connection information object
- */
- public final InstanceConnectionInfo getInstanceConnectionInfo() {
- return this.instanceConnectionInfo;
- }
-
- /**
- * Returns the instance's hardware description as reported by the instance itself.
- *
- * @return the instance's hardware description
- */
- public HardwareDescription getHardwareDescription() {
- return this.hardwareDescription;
- }
-
- /**
- * Checks if all the libraries required to run the job with the given
- * job ID are available on this instance. Any libary that is missing
- * is transferred to the instance as a result of this call.
- *
- * @param jobID
- * the ID of the job whose libraries are to be checked for
- * @throws IOException
- * thrown if an error occurs while checking for the libraries
- */
- public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
-
- // Now distribute the required libraries for the job
- String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
-
- if (requiredLibraries == null) {
- throw new IOException("No entry of required libraries for job " + jobID);
- }
-
- LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
- request.setRequiredLibraries(requiredLibraries);
-
- // Send the request
- LibraryCacheProfileResponse response = null;
- response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
- // Check response and transfer libraries if necessary
- for (int k = 0; k < requiredLibraries.length; k++) {
- if (!response.isCached(k)) {
- LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
- getTaskManagerProxy().updateLibraryCache(update);
- }
- }
- }
-
- /**
- * Submits a list of tasks to the instance's {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
- *
- * @param tasks
- * the list of tasks to be submitted
- * @return the result of the submission attempt
- * @throws IOException
- * thrown if an error occurs while transmitting the task
- */
- public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks)
- throws IOException {
-
- return getTaskManagerProxy().submitTasks(tasks);
- }
-
- /**
- * Cancels the task identified by the given ID at the instance's
- * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
- *
- * @param id
- * the ID identifying the task to be canceled
- * @throws IOException
- * thrown if an error occurs while transmitting the request or receiving the response
- * @return the result of the cancel attempt
- */
- public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
-
- return getTaskManagerProxy().cancelTask(id);
- }
-
- /**
- * Kills the task identified by the given ID at the instance's
- * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
- *
- * @param id
- * the ID identifying the task to be killed
- * @throws IOException
- * thrown if an error occurs while transmitting the request or receiving the response
- * @return the result of the kill attempt
- */
- public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
- return getTaskManagerProxy().killTask(id);
- }
-
- @Override
- public boolean equals(final Object obj) {
-
- // Fall back since dummy instances do not have a instanceConnectionInfo
- if (this.instanceConnectionInfo == null) {
- return super.equals(obj);
- }
-
- if (!(obj instanceof AbstractInstance)) {
- return false;
- }
-
- final AbstractInstance abstractInstance = (AbstractInstance) obj;
-
- return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
- }
-
-
- @Override
- public int hashCode() {
-
- // Fall back since dummy instances do not have a instanceConnectionInfo
- if (this.instanceConnectionInfo == null) {
- return super.hashCode();
- }
-
- return this.instanceConnectionInfo.hashCode();
- }
-
- /**
- * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
- *
- * @throws IOException
- * thrown if an error occurs while transmitting the request
- */
- public synchronized void logBufferUtilization() throws IOException {
-
- getTaskManagerProxy().logBufferUtilization();
- }
-
- /**
- * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
- * tolerance mechanisms.
- *
- * @throws IOException
- * thrown if an error occurs while transmitting the request
- */
- public synchronized void killTaskManager() throws IOException {
-
- getTaskManagerProxy().killTaskManager();
- }
-
- /**
- * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
- *
- * @param channelIDs
- * the channel IDs identifying the cache entries to invalidate
- * @throws IOException
- * thrown if an error occurs during this remote procedure call
- */
- public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
-
- getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
- }
-
- /**
- * Destroys all RPC stub objects attached to this instance.
- */
- public synchronized void destroyProxies() {
-
- destroyTaskManagerProxy();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
index eb0a835..7f2ad04 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
@@ -23,7 +23,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
/**
* An allocated resource object unambiguously defines the
* hardware resources which have been assigned to an {@link eu.stratosphere.nephele.executiongraph.ExecutionVertex} for
- * executing a task. The allocated resource is comprised of an {@link eu.stratosphere.nephele.instance.AbstractInstance}
+ * executing a task. The allocated resource is comprised of an {@link Instance}
* which identifies the node the task is scheduled to run on as well as an
* {@link eu.stratosphere.nephele.instance.AllocationID} which determines the resources the task is scheduled to
* allocate within the node.
@@ -36,12 +36,7 @@ public final class AllocatedResource {
/**
* The instance a task is scheduled to run on.
*/
- private final AbstractInstance instance;
-
- /**
- * The instance type this allocated resource represents.
- */
- private final InstanceType instanceType;
+ private final Instance instance;
/**
* The allocation ID identifying the resources within the instance
@@ -60,24 +55,20 @@ public final class AllocatedResource {
*
* @param instance
* the instance a task is scheduled to run on.
- * @param instanceType
- * the instance type this allocated resource represents
* @param allocationID
* the allocation ID identifying the allocated resources within the instance
*/
- public AllocatedResource(final AbstractInstance instance, final InstanceType instanceType,
- final AllocationID allocationID) {
+ public AllocatedResource(final Instance instance, final AllocationID allocationID) {
this.instance = instance;
- this.instanceType = instanceType;
this.allocationID = allocationID;
}
/**
* Returns the instance a task is scheduled to run on.
- *
+ *
* @return the instance a task is scheduled to run on
*/
- public AbstractInstance getInstance() {
+ public Instance getInstance() {
return this.instance;
}
@@ -90,15 +81,6 @@ public final class AllocatedResource {
return this.allocationID;
}
- /**
- * Returns the instance type this allocated resource represents.
- *
- * @return the instance type this allocated resource represents
- */
- public InstanceType getInstanceType() {
- return this.instanceType;
- }
-
@Override
public boolean equals(final Object obj) {
@@ -120,16 +102,6 @@ public final class AllocatedResource {
}
}
- if (this.instanceType == null) {
- if (allocatedResource.instance != null) {
- return false;
- }
- } else {
- if (!this.instanceType.equals(allocatedResource.getInstanceType())) {
- return false;
- }
- }
-
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
new file mode 100644
index 0000000..0641944
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
@@ -0,0 +1,65 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * An allocated slot is a part of an instance which is assigned to a job.
+ * <p>
+ * This class is thread-safe.
+ *
+ */
+public class AllocatedSlot {
+
+ /**
+ * The allocation ID which identifies the resources occupied by this slot.
+ */
+ private final AllocationID allocationID;
+
+ /**
+ * The ID of the job this slice belongs to.
+ */
+ private final JobID jobID;
+
+ /**
+ * Creates a new allocated slice on the given hosting instance.
+ *
+ * @param jobID
+ * the ID of the job this slice belongs to
+ */
+ public AllocatedSlot(final JobID jobID) {
+
+ this.allocationID = new AllocationID();
+ this.jobID = jobID;
+ }
+
+ /**
+ * Returns the allocation ID of this slice.
+ *
+ * @return the allocation ID of this slice
+ */
+ public AllocationID getAllocationID() {
+ return this.allocationID;
+ }
+
+ /**
+ * Returns the ID of the job this allocated slice belongs to.
+ *
+ * @return the ID of the job this allocated slice belongs to
+ */
+ public JobID getJobID() {
+ return this.jobID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
index 3c83b80..3ed5013 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
@@ -17,8 +17,8 @@ import eu.stratosphere.nephele.AbstractID;
/**
* An allocation ID unambiguously identifies the allocated resources
- * within an {@link AbstractInstance}. The ID is necessary if an {@link InstanceManager} decides to partition
- * {@link AbstractInstance}s
+ * within an {@link Instance}. The ID is necessary if an {@link InstanceManager} decides to partition
+ * {@link Instance}s
* without the knowledge of Nephele's scheduler.
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
new file mode 100644
index 0000000..7d5f31b
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
@@ -0,0 +1,393 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.topology.NetworkNode;
+import eu.stratosphere.nephele.topology.NetworkTopology;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.TimerTask;
+import java.util.Timer;
+
+/**
+ * In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
+ * compute resources,
+ * provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
+ * compute resources in order
+ * to report unexpected resource outages.
+ *
+ */
+public class DefaultInstanceManager implements InstanceManager {
+
+ // ------------------------------------------------------------------------
+ // Internal Constants
+ // ------------------------------------------------------------------------
+
+ /**
+ * The log object used to report debugging and error information.
+ */
+ private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
+
+ /**
+ * Default duration after which a host is purged in case it did not send
+ * a heart-beat message.
+ */
+ private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
+
+ /**
+ * The key to retrieve the clean up interval from the configuration.
+ */
+ private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
+
+ // ------------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------------
+
+ private final Object lock = new Object();
+
+ /**
+ * Duration after which a host is purged in case it did not send a
+ * heart-beat message.
+ */
+ private final long cleanUpInterval;
+
+ /**
+ * Set of hosts known to run a task manager that are thus able to execute
+ * tasks.
+ */
+ private final Map<InstanceConnectionInfo, Instance> registeredHosts;
+
+ /**
+ * The network topology of the cluster.
+ */
+ private final NetworkTopology networkTopology;
+
+ /**
+ * Object that is notified if instances become available or vanish.
+ */
+ private InstanceListener instanceListener;
+
+
+ private boolean shutdown;
+
+ /**
+ * Periodic task that checks whether hosts have not sent their heart-beat
+ * messages and purges the hosts in this case.
+ */
+ private final TimerTask cleanupStaleMachines = new TimerTask() {
+
+ @Override
+ public void run() {
+
+ synchronized (DefaultInstanceManager.this.lock) {
+
+ final List<Map.Entry<InstanceConnectionInfo, Instance>> hostsToRemove =
+ new ArrayList<Map.Entry<InstanceConnectionInfo, Instance>>();
+
+ final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
+
+ // check all hosts whether they did not send heart-beat messages.
+ for (Map.Entry<InstanceConnectionInfo, Instance> entry : registeredHosts.entrySet()) {
+
+ final Instance host = entry.getValue();
+ if (!host.isStillAlive(cleanUpInterval)) {
+
+ // this host has not sent the heart-beat messages
+ // -> we terminate all instances running on this host and notify the jobs
+ final Collection<AllocatedSlot> slots = host.removeAllocatedSlots();
+ for (AllocatedSlot slot : slots) {
+
+ final JobID jobID = slot.getJobID();
+
+ List<AllocatedResource> staleResourcesOfJob = staleResources.get(jobID);
+ if (staleResourcesOfJob == null) {
+ staleResourcesOfJob = new ArrayList<AllocatedResource>();
+ staleResources.put(jobID, staleResourcesOfJob);
+ }
+
+ staleResourcesOfJob.add(new AllocatedResource(host, slot.getAllocationID()));
+ }
+
+ hostsToRemove.add(entry);
+ }
+ }
+
+ registeredHosts.entrySet().removeAll(hostsToRemove);
+
+ final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
+ while (it.hasNext()) {
+ final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
+ if (instanceListener != null) {
+ instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ };
+
+ // ------------------------------------------------------------------------
+ // Constructor and set-up
+ // ------------------------------------------------------------------------
+
+ /**
+ * Constructor.
+ */
+ public DefaultInstanceManager() {
+
+ this.registeredHosts = new HashMap<InstanceConnectionInfo, Instance>();
+
+ long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
+
+ if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
+ LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
+ + " secs.");
+ tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
+ }
+
+ this.cleanUpInterval = tmpCleanUpInterval;
+
+ this.networkTopology = NetworkTopology.createEmptyTopology();
+
+ // look every BASEINTERVAL milliseconds for crashed hosts
+ final boolean runTimerAsDaemon = true;
+ new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
+ }
+
+ @Override
+ public void shutdown() {
+ synchronized (this.lock) {
+ if (this.shutdown) {
+ return;
+ }
+
+ this.cleanupStaleMachines.cancel();
+
+ Iterator<Instance> it = this.registeredHosts.values().iterator();
+ while (it.hasNext()) {
+ it.next().destroyProxies();
+ }
+ this.registeredHosts.clear();
+
+ this.shutdown = true;
+ }
+ }
+
+ @Override
+ public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException
+ {
+ synchronized (this.lock) {
+ // release the instance from the host
+ final Instance clusterInstance = allocatedResource.getInstance();
+ clusterInstance.releaseSlot(allocatedResource.getAllocationID());
+ }
+ }
+
+ /**
+ * Creates a new {@link Instance} object to manage instances that can
+ * be executed on that host.
+ *
+ * @param instanceConnectionInfo
+ * the connection information for the instance
+ * @param hardwareDescription
+ * the hardware description provided by the new instance
+ * @param numberOfSlots
+ * number of slots available on the instance
+ * @return a new {@link Instance} object or <code>null</code> if the cluster instance could not be created
+ */
+ private Instance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
+ final HardwareDescription hardwareDescription, int numberOfSlots) {
+
+ // Try to match new host with a stub host from the existing topology
+ String instanceName = instanceConnectionInfo.hostname();
+ NetworkNode parentNode = this.networkTopology.getRootNode();
+ NetworkNode currentStubNode = null;
+
+ // Try to match new host using the host name
+ while (true) {
+
+ currentStubNode = this.networkTopology.getNodeByName(instanceName);
+ if (currentStubNode != null) {
+ break;
+ }
+
+ final int pos = instanceName.lastIndexOf('.');
+ if (pos == -1) {
+ break;
+ }
+
+ /*
+ * If host name is reported as FQDN, iterative remove parts
+ * of the domain name until a match occurs or no more dots
+ * can be found in the host name.
+ */
+ instanceName = instanceName.substring(0, pos);
+ }
+
+ // Try to match the new host using the IP address
+ if (currentStubNode == null) {
+ instanceName = instanceConnectionInfo.address().toString();
+ instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
+ currentStubNode = this.networkTopology.getNodeByName(instanceName);
+ }
+
+ if (currentStubNode != null) {
+ /*
+ * The instance name will be the same as the one of the stub node. That way
+ * the stub now will be removed from the network topology and replaced be
+ * the new node.
+ */
+ if (currentStubNode.getParentNode() != null) {
+ parentNode = currentStubNode.getParentNode();
+ }
+ // Remove the stub node from the tree
+ currentStubNode.remove();
+ }
+
+ LOG.info("Creating instance for " + instanceConnectionInfo + ", parent is "
+ + parentNode.getName());
+ final Instance host = new Instance(instanceConnectionInfo, parentNode,
+ this.networkTopology, hardwareDescription, numberOfSlots);
+
+ return host;
+ }
+
+ @Override
+ public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
+
+ synchronized (this.lock) {
+ Instance host = registeredHosts.get(instanceConnectionInfo);
+
+ if(host == null){
+ LOG.error("Task manager with connection info " + instanceConnectionInfo + " has not been registered.");
+ return;
+ }
+
+ host.reportHeartBeat();
+ }
+ }
+
+ @Override
+ public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+ HardwareDescription hardwareDescription, int numberOfSlots){
+ synchronized(this.lock){
+ if(registeredHosts.containsKey(instanceConnectionInfo)){
+ LOG.error("Task manager with connection info " + instanceConnectionInfo + " has already been " +
+ "registered.");
+ return;
+ }
+
+ Instance host = createNewHost(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+
+ if(host == null){
+ LOG.error("Could not create a new host object for register task manager for connection info " +
+ instanceConnectionInfo);
+ return;
+ }
+
+ this.registeredHosts.put(instanceConnectionInfo, host);
+ LOG.info("New number of registered hosts is " + this.registeredHosts.size());
+
+ host.reportHeartBeat();
+ }
+ }
+
+ @Override
+ public void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
+ throws InstanceException
+ {
+
+ synchronized(this.lock) {
+ Iterator<Instance> clusterIterator = this.registeredHosts.values().iterator();
+ Instance instance = null;
+ List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
+ int allocatedSlots = 0;
+
+ while(clusterIterator.hasNext()) {
+ instance = clusterIterator.next();
+ while(instance.getNumberOfAvailableSlots() >0 && allocatedSlots < requiredSlots){
+ AllocatedResource resource = instance.allocateSlot(jobID);
+ allocatedResources.add(resource);
+ allocatedSlots++;
+ }
+ }
+
+ if(allocatedSlots < requiredSlots){
+ throw new InstanceException("Cannot allocate the required number of slots: " + requiredSlots + ".");
+ }
+
+ if (this.instanceListener != null) {
+ final InstanceNotifier instanceNotifier = new InstanceNotifier(
+ this.instanceListener, jobID, allocatedResources);
+ instanceNotifier.start();
+ }
+ }
+ }
+
+ @Override
+ public NetworkTopology getNetworkTopology(JobID jobID) {
+ return this.networkTopology;
+ }
+
+ @Override
+ public void setInstanceListener(InstanceListener instanceListener) {
+ synchronized (this.lock) {
+ this.instanceListener = instanceListener;
+ }
+ }
+
+ @Override
+ public Instance getInstanceByName(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Argument name must not be null");
+ }
+
+ synchronized (this.lock) {
+ final Iterator<Instance> it = this.registeredHosts.values().iterator();
+ while (it.hasNext()) {
+ final Instance instance = it.next();
+ if (name.equals(instance.getName())) {
+ return instance;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public int getNumberOfTaskTrackers() {
+ return this.registeredHosts.size();
+ }
+
+ @Override
+ public int getNumberOfSlots() {
+ int slots = 0;
+
+ for(Instance instance: registeredHosts.values()){
+ slots += instance.getNumberOfSlots();
+ }
+
+ return slots;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
index 4e0f004..56f44c6 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
@@ -14,32 +14,30 @@
package eu.stratosphere.nephele.instance;
/**
- * A DummyInstance is a stub implementation of the {@link AbstractInstance} interface.
+ * A DummyInstance is a stub implementation of the {@link Instance} interface.
* Dummy instances are used to plan a job execution but must be replaced with
* concrete instances before the job execution starts.
*
*/
-public class DummyInstance extends AbstractInstance {
+public class DummyInstance extends Instance {
private static int nextID = 0;
private final String name;
- public static synchronized DummyInstance createDummyInstance(InstanceType type) {
+ public static synchronized DummyInstance createDummyInstance() {
- return new DummyInstance(type, nextID++);
+ return new DummyInstance(nextID++);
}
/**
* Constructs a new dummy instance of the given instance type.
*
- * @param type
- * the type of the new dummy instance
* @param id
* the ID of the dummy instance
*/
- private DummyInstance(InstanceType type, int id) {
- super(type, null, null, null, null);
+ private DummyInstance(int id) {
+ super(null, null, null, null, 0);
this.name = "DummyInstance_" + Integer.toString(id);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
new file mode 100644
index 0000000..398a2a8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+/**
+ * Convenience class to extract hardware specifics of the computer executing this class
+ */
+public class Hardware {
+
+ public static int getNumberCPUCores() {
+ return Runtime.getRuntime().availableProcessors();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
new file mode 100644
index 0000000..fa17745
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
@@ -0,0 +1,362 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collection;
+
+import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.ipc.RPC;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.net.NetUtils;
+import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
+import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
+import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
+import eu.stratosphere.nephele.topology.NetworkNode;
+import eu.stratosphere.nephele.topology.NetworkTopology;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+/**
+ * An instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
+ *
+ */
+public class Instance extends NetworkNode {
+ /**
+ * The connection info identifying the instance.
+ */
+ private final InstanceConnectionInfo instanceConnectionInfo;
+
+ /**
+ * The hardware description as reported by the instance itself.
+ */
+ private final HardwareDescription hardwareDescription;
+
+ /**
+ * Number of slots available on the node
+ */
+ private final int numberOfSlots;
+
+ /**
+ * Allocated slots on this instance
+ */
+ private final Map<AllocationID, AllocatedSlot> allocatedSlots = new HashMap<AllocationID, AllocatedSlot>();
+
+ /**
+ * Stores the RPC stub object for the instance's task manager.
+ */
+ private TaskOperationProtocol taskManager = null;
+
+ /**
+ * Time when last heat beat has been received from the task manager running on this instance.
+ */
+ private long lastReceivedHeartBeat = System.currentTimeMillis();
+
+ /**
+ * Constructs an abstract instance object.
+ *
+ * @param instanceConnectionInfo
+ * the connection info identifying the instance
+ * @param parentNode
+ * the parent node in the network topology
+ * @param networkTopology
+ * the network topology this node is a part of
+ * @param hardwareDescription
+ * the hardware description provided by the instance itself
+ */
+ public Instance(final InstanceConnectionInfo instanceConnectionInfo,
+ final NetworkNode parentNode, final NetworkTopology networkTopology,
+ final HardwareDescription hardwareDescription, int numberOfSlots) {
+ super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
+ this.instanceConnectionInfo = instanceConnectionInfo;
+ this.hardwareDescription = hardwareDescription;
+ this.numberOfSlots = numberOfSlots;
+ }
+
+ /**
+ * Creates or returns the RPC stub object for the instance's task manager.
+ *
+ * @return the RPC stub object for the instance's task manager
+ * @throws IOException
+ * thrown if the RPC stub object for the task manager cannot be created
+ */
+ private TaskOperationProtocol getTaskManagerProxy() throws IOException {
+
+ if (this.taskManager == null) {
+
+ this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+ new InetSocketAddress(getInstanceConnectionInfo().address(),
+ getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+ }
+
+ return this.taskManager;
+ }
+
+ /**
+ * Destroys and removes the RPC stub object for this instance's task manager.
+ */
+ private void destroyTaskManagerProxy() {
+
+ if (this.taskManager != null) {
+ RPC.stopProxy(this.taskManager);
+ this.taskManager = null;
+ }
+ }
+
+ /**
+ * Returns the instance's connection information object.
+ *
+ * @return the instance's connection information object
+ */
+ public final InstanceConnectionInfo getInstanceConnectionInfo() {
+ return this.instanceConnectionInfo;
+ }
+
+ /**
+ * Returns the instance's hardware description as reported by the instance itself.
+ *
+ * @return the instance's hardware description
+ */
+ public HardwareDescription getHardwareDescription() {
+ return this.hardwareDescription;
+ }
+
+ /**
+ * Checks if all the libraries required to run the job with the given
+ * job ID are available on this instance. Any libary that is missing
+ * is transferred to the instance as a result of this call.
+ *
+ * @param jobID
+ * the ID of the job whose libraries are to be checked for
+ * @throws IOException
+ * thrown if an error occurs while checking for the libraries
+ */
+ public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
+
+ // Now distribute the required libraries for the job
+ String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
+
+ if (requiredLibraries == null) {
+ throw new IOException("No entry of required libraries for job " + jobID);
+ }
+
+ LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+ request.setRequiredLibraries(requiredLibraries);
+
+ // Send the request
+ LibraryCacheProfileResponse response = null;
+ response = getTaskManagerProxy().getLibraryCacheProfile(request);
+
+ // Check response and transfer libraries if necessary
+ for (int k = 0; k < requiredLibraries.length; k++) {
+ if (!response.isCached(k)) {
+ LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+ getTaskManagerProxy().updateLibraryCache(update);
+ }
+ }
+ }
+
+ /**
+ * Submits a list of tasks to the instance's {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+ *
+ * @param tasks
+ * the list of tasks to be submitted
+ * @return the result of the submission attempt
+ * @throws IOException
+ * thrown if an error occurs while transmitting the task
+ */
+ public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks)
+ throws IOException {
+
+ return getTaskManagerProxy().submitTasks(tasks);
+ }
+
+ /**
+ * Cancels the task identified by the given ID at the instance's
+ * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+ *
+ * @param id
+ * the ID identifying the task to be canceled
+ * @throws IOException
+ * thrown if an error occurs while transmitting the request or receiving the response
+ * @return the result of the cancel attempt
+ */
+ public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
+
+ return getTaskManagerProxy().cancelTask(id);
+ }
+
+ /**
+ * Kills the task identified by the given ID at the instance's
+ * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+ *
+ * @param id
+ * the ID identifying the task to be killed
+ * @throws IOException
+ * thrown if an error occurs while transmitting the request or receiving the response
+ * @return the result of the kill attempt
+ */
+ public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
+
+ return getTaskManagerProxy().killTask(id);
+ }
+
+ /**
+ * Updates the time of last received heart beat to the current system time.
+ */
+ public synchronized void reportHeartBeat() {
+ this.lastReceivedHeartBeat = System.currentTimeMillis();
+ }
+
+ /**
+ * Returns whether the host is still alive.
+ *
+ * @param cleanUpInterval
+ * duration (in milliseconds) after which a host is
+ * considered dead if it has no received heat-beats.
+ * @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
+ * has expired, <code>false</code> otherwise
+ */
+ public synchronized boolean isStillAlive(final long cleanUpInterval) {
+
+ if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public boolean equals(final Object obj) {
+
+ // Fall back since dummy instances do not have a instanceConnectionInfo
+ if (this.instanceConnectionInfo == null) {
+ return super.equals(obj);
+ }
+
+ if (!(obj instanceof Instance)) {
+ return false;
+ }
+
+ final Instance abstractInstance = (Instance) obj;
+
+ return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
+ }
+
+
+ @Override
+ public int hashCode() {
+
+ // Fall back since dummy instances do not have a instanceConnectionInfo
+ if (this.instanceConnectionInfo == null) {
+ return super.hashCode();
+ }
+
+ return this.instanceConnectionInfo.hashCode();
+ }
+
+ /**
+ * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
+ *
+ * @throws IOException
+ * thrown if an error occurs while transmitting the request
+ */
+ public synchronized void logBufferUtilization() throws IOException {
+
+ getTaskManagerProxy().logBufferUtilization();
+ }
+
+ /**
+ * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
+ * tolerance mechanisms.
+ *
+ * @throws IOException
+ * thrown if an error occurs while transmitting the request
+ */
+ public synchronized void killTaskManager() throws IOException {
+
+ getTaskManagerProxy().killTaskManager();
+ }
+
+ /**
+ * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
+ *
+ * @param channelIDs
+ * the channel IDs identifying the cache entries to invalidate
+ * @throws IOException
+ * thrown if an error occurs during this remote procedure call
+ */
+ public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
+ getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
+ }
+
+ /**
+ * Destroys all RPC stub objects attached to this instance.
+ */
+ public synchronized void destroyProxies() {
+
+ destroyTaskManagerProxy();
+
+ }
+
+ public int getNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ public int getNumberOfAvailableSlots() { return numberOfSlots - allocatedSlots.size(); }
+
+ public synchronized AllocatedResource allocateSlot(JobID jobID) throws InstanceException{
+ if(allocatedSlots.size() < numberOfSlots){
+ AllocatedSlot slot = new AllocatedSlot(jobID);
+
+ allocatedSlots.put(slot.getAllocationID(), slot);
+ return new AllocatedResource(this,slot.getAllocationID());
+ }else{
+ throw new InstanceException("Overbooking instance " + instanceConnectionInfo + ".");
+ }
+ }
+
+ public synchronized void releaseSlot(AllocationID allocationID) {
+ if(allocatedSlots.containsKey(allocationID)){
+ allocatedSlots.remove(allocationID);
+ }else{
+ throw new RuntimeException("There is no slot registered with allocation ID " + allocationID + ".");
+ }
+ }
+
+ public Collection<AllocatedSlot> getAllocatedSlots() {
+ return allocatedSlots.values();
+ }
+
+ public Collection<AllocatedSlot> removeAllocatedSlots() {
+ Collection<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(this.allocatedSlots.values());
+
+ for(AllocatedSlot slot : slots){
+ releaseSlot(slot.getAllocationID());
+ }
+
+ return slots;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
index a1015b5..00795f4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
@@ -13,157 +13,32 @@
package eu.stratosphere.nephele.instance;
-import java.util.List;
-import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkTopology;
-/**
- * In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
- * compute resources,
- * provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
- * compute resources in order
- * to report unexpected resource outages.
- *
- */
public interface InstanceManager {
- /**
- * Requests an instance of the provided instance type from the instance manager.
- *
- * @param jobID
- * the ID of the job this instance is requested for
- * @param conf
- * a configuration object including additional request information (e.g. credentials)
- * @param instanceRequestMap
- * a map specifying the instances requested by this call
- * @param count
- * the number of instances
- * @throws InstanceException
- * thrown if an error occurs during the instance request
- */
- void requestInstance(JobID jobID, Configuration conf, InstanceRequestMap instanceRequestMap,
- List<String> splitAffinityList) throws InstanceException;
- /**
- * Releases an allocated resource from a job.
- *
- * @param jobID
- * the ID of the job the instance has been used for
- * @param conf
- * a configuration object including additional release information (e.g. credentials)
- * @param allocatedResource
- * the allocated resource to be released
- * @throws InstanceException
- * thrown if an error occurs during the release process
- */
- void releaseAllocatedResource(JobID jobID, Configuration conf, AllocatedResource allocatedResource)
- throws InstanceException;
-
- /**
- * Suggests a suitable instance type according to the provided hardware characteristics.
- *
- * @param minNumComputeUnits
- * the minimum number of compute units
- * @param minNumCPUCores
- * the minimum number of CPU cores
- * @param minMemorySize
- * the minimum number of main memory (in MB)
- * @param minDiskCapacity
- * the minimum hard disk capacity (in GB)
- * @param maxPricePerHour
- * the maximum price per hour for the instance
- * @return the instance type matching the requested hardware profile best or <code>null</code> if no such instance
- * type is available
- */
- InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores, int minMemorySize,
- int minDiskCapacity, int maxPricePerHour);
+ void shutdown();
- /**
- * Reports a heart beat message of an instance.
- *
- * @param instanceConnectionInfo
- * the {@link InstanceConnectionInfo} object attached to the heart beat message
- * @param hardwareDescription
- * a hardware description with details on the instance's compute resources.
- */
- void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription);
+ void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException;
- /**
- * Translates the name of an instance type to the corresponding instance type object.
- *
- * @param instanceTypeName
- * the name of the instance type
- * @return the instance type object matching the name or <code>null</code> if no such instance type exists
- */
- InstanceType getInstanceTypeByName(String instanceTypeName);
+ void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo);
- /**
- * Returns the default instance type used by the instance manager.
- *
- * @return the default instance type
- */
- InstanceType getDefaultInstanceType();
+ void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+ HardwareDescription hardwareDescription, int numberOfSlots);
+ void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
+ throws InstanceException;
- /**
- * Returns the network topology for the job with the given ID. The network topology
- * for the job might only be an excerpt of the overall network topology. It only
- * includes those instances as leaf nodes which are really allocated for the
- * execution of the job.
- *
- * @param jobID
- * the ID of the job to get the topology for
- * @return the network topology for the job
- */
NetworkTopology getNetworkTopology(JobID jobID);
- /**
- * Sets the {@link InstanceListener} object which is supposed to be
- * notified about instance availability and deaths.
- *
- * @param instanceListener
- * the instance listener to set for this instance manager
- */
void setInstanceListener(InstanceListener instanceListener);
- /**
- * Returns a map of all instance types which are currently available to Nephele. The map contains a description of
- * the hardware characteristics for each instance type as provided in the configuration file. Moreover, it contains
- * the actual hardware description as reported by task managers running on the individual instances. If available,
- * the map also contains the maximum number instances Nephele can allocate of each instance type (i.e. if no other
- * job occupies instances).
- *
- * @return a list of all instance types available to Nephele
- */
- Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes();
-
- /**
- * Returns the {@link AbstractInstance} with the given name.
- *
- * @param name
- * the name of the instance
- * @return the instance with the given name or <code>null</code> if no such instance could be found
- */
- AbstractInstance getInstanceByName(String name);
+ Instance getInstanceByName(String name);
- /**
- * Cancels all pending instance requests that might still exist for the job with the given ID.
- *
- * @param jobID
- * the ID of the job to cancel the pending instance requests for
- */
- void cancelPendingRequests(JobID jobID);
-
- /**
- * Shuts the instance manager down and stops all its internal processes.
- */
- void shutdown();
-
- /**
- *
- * @return the number of available (registered) TaskTrackers
- */
int getNumberOfTaskTrackers();
+
+ int getNumberOfSlots();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
new file mode 100644
index 0000000..2df3d3d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
@@ -0,0 +1,71 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import java.util.List;
+
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.InstanceListener;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * This class is an auxiliary class to send the notification
+ * about the availability of an {@link eu.stratosphere.nephele.instance.Instance} to the given {@link
+ * InstanceListener} object. The notification must be sent from
+ * a separate thread, otherwise the atomic operation of requesting an instance
+ * for a vertex and switching to the state ASSIGNING could not be guaranteed.
+ * This class is thread-safe.
+ *
+ */
+public class InstanceNotifier extends Thread {
+
+ /**
+ * The {@link InstanceListener} object to send the notification to.
+ */
+ private final InstanceListener instanceListener;
+
+ /**
+ * The ID of the job the notification refers to.
+ */
+ private final JobID jobID;
+
+ /**
+ * The allocated resources the notification refers to.
+ */
+ private final List<AllocatedResource> allocatedResources;
+
+ /**
+ * Constructs a new instance notifier object.
+ *
+ * @param instanceListener
+ * the listener to send the notification to
+ * @param jobID
+ * the ID of the job the notification refers to
+ * @param allocatedResources
+ * the resources with has been allocated for the job
+ */
+ public InstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
+ final List<AllocatedResource> allocatedResources) {
+ this.instanceListener = instanceListener;
+ this.jobID = jobID;
+ this.allocatedResources = allocatedResources;
+ }
+
+
+ @Override
+ public void run() {
+
+ this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
deleted file mode 100644
index 4167f67..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * An instance request map specifies the required types of instances to run a specific job and the respective number
- * thereof. For each instance type it is possible to specify the minimum number of instances required to run the job. If
- * the {@link InstanceManager} cannot manage to provide at least this minimum numbers of instances for the given type,
- * the job will be rejected.
- * <p>
- * In addition, is it also possible to specify the optimal number of instances for a particular instance type. The
- * {@link InstanceManager} will try to provide this optimal number of instances, but will also start the job with less
- * instances.
- * <p>
- * This class is not thread-safe.
- *
- */
-public final class InstanceRequestMap {
-
- /**
- * The map holding the minimum number of instances to be requested for each instance type.
- */
- private final Map<InstanceType, Integer> minimumMap = new HashMap<InstanceType, Integer>();
-
- /**
- * The map holding the maximum number of instances to be requested for each instance type.
- */
- private final Map<InstanceType, Integer> maximumMap = new HashMap<InstanceType, Integer>();
-
- /**
- * Sets the minimum number of instances to be requested from the given instance type.
- *
- * @param instanceType
- * the type of instance to request
- * @param number
- * the minimum number of instances to request
- */
- public void setMinimumNumberOfInstances(final InstanceType instanceType, final int number) {
-
- this.minimumMap.put(instanceType, Integer.valueOf(number));
- }
-
- /**
- * Sets the maximum number of instances to be requested from the given instance type.
- *
- * @param instanceType
- * the type of instance to request
- * @param number
- * the maximum number of instances to request
- */
- public void setMaximumNumberOfInstances(final InstanceType instanceType, final int number) {
-
- this.maximumMap.put(instanceType, Integer.valueOf(number));
- }
-
- /**
- * Sets both the minimum and the maximum number of instances to be requested from the given instance type.
- *
- * @param instanceType
- * the type of instance to request
- * @param number
- * the minimum and the maximum number of instances to request
- */
- public void setNumberOfInstances(final InstanceType instanceType, final int number) {
-
- setMinimumNumberOfInstances(instanceType, number);
- setMaximumNumberOfInstances(instanceType, number);
- }
-
- /**
- * Returns the minimum number of instances to be requested from the given instance type.
- *
- * @param instanceType
- * the type of instance to request
- * @return the minimum number of instances to be requested from the given instance type
- */
- public int getMinimumNumberOfInstances(final InstanceType instanceType) {
-
- final Integer val = this.minimumMap.get(instanceType);
- if (val != null) {
- return val.intValue();
- }
-
- return 0;
- }
-
- /**
- * Returns the maximum number of instances to be requested from the given instance type.
- *
- * @param instanceType
- * the type of instance to request
- * @return the maximum number of instances to be requested from the given instance type
- */
- public int getMaximumNumberOfInstances(final InstanceType instanceType) {
-
- final Integer val = this.maximumMap.get(instanceType);
- if (val != null) {
- return val.intValue();
- }
-
- return 0;
- }
-
- /**
- * Checks if this instance request map is empty, i.e. neither contains an entry for the minimum or maximum number of
- * instances to be requested for any instance type.
- *
- * @return <code>true</code> if the map is empty, <code>false</code> otherwise
- */
- public boolean isEmpty() {
-
- if (!this.maximumMap.isEmpty()) {
- return false;
- }
-
- if (!this.minimumMap.isEmpty()) {
- return false;
- }
-
- return true;
- }
-
- /**
- * Returns an {@link Iterator} object which allows to traverse the minimum number of instances to be requested for
- * each instance type.
- *
- * @return an iterator to traverse the minimum number of instances to be requested for each instance type
- */
- public Iterator<Map.Entry<InstanceType, Integer>> getMaximumIterator() {
-
- return this.maximumMap.entrySet().iterator();
- }
-
- /**
- * Returns an {@link Iterator} object which allows to traverse the maximum number of instances to be requested for
- * each instance type.
- *
- * @return an iterator to traverse the maximum number of instances to be requested for each instance type
- */
- public Iterator<Map.Entry<InstanceType, Integer>> getMinimumIterator() {
-
- return this.minimumMap.entrySet().iterator();
- }
-
- /**
- * Returns the number of different instance types stored in this request map.
- *
- * @return the number of different instance types stored in this request map
- */
- public int size() {
-
- final int s = this.maximumMap.size();
-
- if (s != this.minimumMap.size()) {
- throw new IllegalStateException("InstanceRequestMap is in an inconsistent state");
- }
-
- return s;
- }
-
- /**
- * Clears the instance request map.
- */
- public void clear() {
-
- this.maximumMap.clear();
- this.minimumMap.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
deleted file mode 100644
index f2bb4e5..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.io.StringRecord;
-
-/**
- * An instance type describes the hardware resources a task manager runs on. According
- * to its type an instance has a specific number of CPU cores, computation units, a certain
- * amount of main memory and disk space. In addition, it has a specific price per hour.
- *
- */
-public final class InstanceType implements IOReadableWritable {
-
- /**
- * The identifier for this instance type.
- */
- private String identifier;
-
- /**
- * The number of computational units of this instance type.
- * A computational unit is a virtual compute capacity. A host with a
- * single-core 2 GHz CPU may possess 20 compute units (1*20), while a
- * dual-core 2.5 GHz CPU may possess 50 compute units (2*25). The
- * specified number of compute units expresses the fraction of the
- * CPU capacity promised to a user.
- */
- private int numberOfComputeUnits = 0;
-
- /**
- * The number of CPU cores of this instance type.
- */
- private int numberOfCores = 0;
-
- /**
- * The amount of main memory of this instance type (in MB).
- */
- private int memorySize = 0;
-
- /**
- * The disk capacity of this instance type (in GB).
- */
- private int diskCapacity = 0;
-
- /**
- * The price per hour that is charged for running instances of this type.
- */
- private int pricePerHour = 0;
-
- /**
- * Public constructor required for the serialization process.
- */
- public InstanceType() {
- }
-
- /**
- * Creates a new instance type.
- *
- * @param identifier
- * identifier for this instance type
- * @param numberOfComputeUnits
- * number of computational units of this instance type
- * @param numberOfCores
- * number of CPU cores of this instance type
- * @param memorySize
- * amount of main memory of this instance type (in MB)
- * @param diskCapacity
- * disk capacity of this instance type (in GB)
- * @param pricePerHour
- * price per hour that is charged for running instances of this type
- */
- InstanceType(final String identifier, final int numberOfComputeUnits, final int numberOfCores,
- final int memorySize,
- final int diskCapacity, final int pricePerHour) {
-
- this.identifier = identifier;
- this.numberOfComputeUnits = numberOfComputeUnits;
- this.numberOfCores = numberOfCores;
- this.memorySize = memorySize;
- this.diskCapacity = diskCapacity;
- this.pricePerHour = pricePerHour;
- }
-
- /**
- * Returns the instance type's number of computational units.
- *
- * @return the instance type's number of computational units
- */
- public int getNumberOfComputeUnits() {
- return this.numberOfComputeUnits;
- }
-
- /**
- * Returns the instance type's number of CPU cores.
- *
- * @return the instance type's number of CPU cores
- */
- public int getNumberOfCores() {
- return this.numberOfCores;
- }
-
- /**
- * Returns the instance type's amount of main memory.
- *
- * @return the instance type's amount of main memory
- */
- public int getMemorySize() {
- return this.memorySize;
- }
-
- /**
- * Returns the instance type's disk capacity.
- *
- * @return the instance type's disk capacity
- */
- public int getDiskCapacity() {
- return this.diskCapacity;
- }
-
- /**
- * Returns the instance type's price per hour.
- *
- * @return the instance type's price per hour
- */
- public int getPricePerHour() {
- return this.pricePerHour;
- }
-
- /**
- * Returns the instance type's identifier.
- *
- * @return the instance type's identifier
- */
- public String getIdentifier() {
- return this.identifier;
- }
-
-
- @Override
- public String toString() {
-
- final StringBuilder bld = new StringBuilder(32);
- bld.append(this.identifier);
- bld.append(' ');
- bld.append('(');
- bld.append(this.numberOfComputeUnits);
- bld.append(',');
- bld.append(this.numberOfCores);
- bld.append(',');
- bld.append(this.memorySize);
- bld.append(',');
- bld.append(this.diskCapacity);
- bld.append(',');
- bld.append(this.pricePerHour);
- bld.append(')');
-
- return bld.toString();
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- StringRecord.writeString(out, this.identifier);
- out.writeInt(this.numberOfComputeUnits);
- out.writeInt(this.numberOfCores);
- out.writeInt(this.memorySize);
- out.writeInt(this.diskCapacity);
- out.writeInt(this.pricePerHour);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- this.identifier = StringRecord.readString(in);
- this.numberOfComputeUnits = in.readInt();
- this.numberOfCores = in.readInt();
- this.memorySize = in.readInt();
- this.diskCapacity = in.readInt();
- this.pricePerHour = in.readInt();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
deleted file mode 100644
index ce0a694..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * An instance type description provides details of instance type. Is can comprise both the hardware description from
- * the instance type description (as provided by the operator/administrator of the instance) as well as the actual
- * hardware description which has been determined on the compute instance itself.
- *
- */
-public final class InstanceTypeDescription implements IOReadableWritable {
-
- /**
- * The instance type.
- */
- private InstanceType instanceType = null;
-
- /**
- * The hardware description as created by the {@link InstanceManager}.
- */
- private HardwareDescription hardwareDescription = null;
-
- /**
- * The maximum number of available instances of this type.
- */
- private int maximumNumberOfAvailableInstances = 0;
-
- /**
- * Public default constructor required for serialization process.
- */
- public InstanceTypeDescription() {
- }
-
- /**
- * Constructs a new instance type description.
- *
- * @param instanceType
- * the instance type
- * @param hardwareDescription
- * the hardware description as created by the {@link InstanceManager}
- * @param maximumNumberOfAvailableInstances
- * the maximum number of available instances of this type
- */
- InstanceTypeDescription(final InstanceType instanceType, final HardwareDescription hardwareDescription,
- final int maximumNumberOfAvailableInstances) {
-
- this.instanceType = instanceType;
- this.hardwareDescription = hardwareDescription;
- this.maximumNumberOfAvailableInstances = maximumNumberOfAvailableInstances;
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- if (this.instanceType == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.instanceType.write(out);
- }
-
- if (this.hardwareDescription == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.hardwareDescription.write(out);
- }
-
- out.writeInt(this.maximumNumberOfAvailableInstances);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- if (in.readBoolean()) {
- this.instanceType = new InstanceType();
- this.instanceType.read(in);
- } else {
- this.instanceType = null;
- }
-
- if (in.readBoolean()) {
- this.hardwareDescription = new HardwareDescription();
- this.hardwareDescription.read(in);
- }
-
- this.maximumNumberOfAvailableInstances = in.readInt();
- }
-
- /**
- * Returns the hardware description as created by the {@link InstanceManager}.
- *
- * @return the instance's hardware description or <code>null</code> if no description is available
- */
- public HardwareDescription getHardwareDescription() {
- return this.hardwareDescription;
- }
-
- /**
- * Returns the instance type as determined by the {@link InstanceManager}.
- *
- * @return the instance type
- */
- public InstanceType getInstanceType() {
- return this.instanceType;
- }
-
- /**
- * Returns the maximum number of instances the {@link InstanceManager} can at most allocate of this instance type
- * (i.e. when no other jobs are occupying any resources).
- *
- * @return the maximum number of instances of this type or <code>-1</code> if the number is unknown to the
- * {@link InstanceManager}
- */
- public int getMaximumNumberOfAvailableInstances() {
- return this.maximumNumberOfAvailableInstances;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
deleted file mode 100644
index 2b3e7db..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-/**
- * This factory produces {@link InstanceTypeDescription} objects.
- * <p>
- * This class is thread-safe.
- *
- */
-public class InstanceTypeDescriptionFactory {
-
- /**
- * Private constructor, so class cannot be instantiated.
- */
- private InstanceTypeDescriptionFactory() {
- }
-
- /**
- * Constructs a new {@link InstaceTypeDescription} object.
- *
- * @param instanceType
- * the instance type
- * @param hardwareDescription
- * the hardware description as created by the {@link InstanceManager}
- * @param numberOfAvailableInstances
- * the number of available instances of this type
- * @return the instance type description
- */
- public static InstanceTypeDescription construct(InstanceType instanceType, HardwareDescription hardwareDescription,
- int numberOfAvailableInstances) {
-
- return new InstanceTypeDescription(instanceType, hardwareDescription, numberOfAvailableInstances);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
deleted file mode 100644
index ff501c4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * This factory constructs {@link InstanceType} objects.
- *
- */
-public class InstanceTypeFactory {
-
- /**
- * The logger used to report errors.
- */
- private static final Log LOG = LogFactory.getLog(InstanceTypeFactory.class);
-
- /**
- * The pattern used to parse the hardware descriptions of instance types.
- */
- private static Pattern INSTANCE_TYPE_PATTERN = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
-
- /**
- * Private constructor, so class cannot be instantiated.
- */
- private InstanceTypeFactory() {
- }
-
- /**
- * Constructs an {@link InstanceType} object by parsing a hardware description string.
- *
- * @param description
- * the hardware description reflected by this instance type
- * @return an instance type reflecting the given hardware description or <code>null</code> if the description cannot
- * be parsed
- */
- public static InstanceType constructFromDescription(String description) {
-
- final Matcher m = INSTANCE_TYPE_PATTERN.matcher(description);
- if (!m.matches()) {
- LOG.error("Cannot extract instance type from string " + description);
- return null;
- }
-
- final String identifier = m.group(1);
- final int numComputeUnits = Integer.parseInt(m.group(2));
- final int numCores = Integer.parseInt(m.group(3));
- final int memorySize = Integer.parseInt(m.group(4));
- final int diskCapacity = Integer.parseInt(m.group(5));
- final int pricePerHour = Integer.parseInt(m.group(6));
-
- return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour);
- }
-
- /**
- * Constructs an {@link InstanceType} from the given parameters.
- *
- * @param identifier
- * identifier for this instance type
- * @param numberOfComputeUnits
- * number of computational units of this instance type
- * @param numberOfCores
- * number of CPU cores of this instance type
- * @param memorySize
- * amount of main memory of this instance type (in MB)
- * @param diskCapacity
- * disk capacity of this instance type (in GB)
- * @param pricePerHour
- * price per hour that is charged for running instances of this type
- */
- public static InstanceType construct(String identifier, int numberOfComputeUnits, int numberOfCores,
- int memorySize, int diskCapacity, int pricePerHour) {
-
- return new InstanceType(identifier, numberOfComputeUnits, numberOfCores, memorySize, diskCapacity, pricePerHour);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
new file mode 100644
index 0000000..1576649
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.nephele.taskmanager.TaskManager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LocalInstanceManager extends DefaultInstanceManager {
+
+ private List<TaskManager> taskManagers = new ArrayList<TaskManager>();
+
+ public LocalInstanceManager() throws Exception{
+ int numTaskManager = GlobalConfiguration.getInteger(ConfigConstants
+ .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+
+ ExecutionMode execMode = numTaskManager == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
+
+ for (int i=0; i < numTaskManager; i++){
+ Configuration tm = new Configuration();
+ int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
+ int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+ tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
+ tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
+
+ GlobalConfiguration.includeConfiguration(tm);
+
+ taskManagers.add(new TaskManager(execMode));
+ }
+ }
+
+ @Override
+ public void shutdown(){
+ for(TaskManager taskManager: taskManagers){
+ taskManager.shutdown();
+ }
+
+ super.shutdown();
+ }
+}