You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:29 UTC
[08/22] Rework the Taskmanager to a slot based model and remove
legacy cloud code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
deleted file mode 100644
index 88d71e0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * An allocated slice is a part of an instance which is assigned to a job.
- * <p>
- * This class is thread-safe.
- *
- */
-class AllocatedSlice {
-
- /**
- * The allocation ID which identifies the resources occupied by this slice.
- */
- private final AllocationID allocationID;
-
- /**
- * The machine hosting the slice.
- */
- private final ClusterInstance hostingInstance;
-
- /**
- * The type describing the characteristics of the allocated slice.
- */
- private final InstanceType type;
-
- /**
- * The ID of the job this slice belongs to.
- */
- private final JobID jobID;
-
- /**
- * Time when this machine has been allocation in milliseconds, {@see currentTimeMillis()}.
- */
- private final long allocationTime;
-
- /**
- * Creates a new allocated slice on the given hosting instance.
- *
- * @param hostingInstance
- * the instance hosting the slice
- * @param type
- * the type describing the characteristics of the allocated slice
- * @param jobID
- * the ID of the job this slice belongs to
- * @param allocationTime
- * the time the instance was allocated
- */
- public AllocatedSlice(final ClusterInstance hostingInstance, final InstanceType type, final JobID jobID,
- final long allocationTime) {
-
- this.allocationID = new AllocationID();
- this.hostingInstance = hostingInstance;
- this.type = type;
- this.jobID = jobID;
- this.allocationTime = allocationTime;
- }
-
- /**
- * Returns the allocation ID of this slice.
- *
- * @return the allocation ID of this slice
- */
- public AllocationID getAllocationID() {
- return this.allocationID;
- }
-
- /**
- * The type describing the characteristics of
- * this allocated slice.
- *
- * @return the type describing the characteristics of the slice
- */
- public InstanceType getType() {
- return this.type;
- }
-
- /**
- * Returns the time the instance was allocated.
- *
- * @return the time the instance was allocated
- */
- public long getAllocationTime() {
- return this.allocationTime;
- }
-
- /**
- * Returns the ID of the job this allocated slice belongs to.
- *
- * @return the ID of the job this allocated slice belongs to
- */
- public JobID getJobID() {
- return this.jobID;
- }
-
- /**
- * Returns the instance hosting this slice.
- *
- * @return the instance hosting this slice
- */
- public ClusterInstance getHostingInstance() {
- return this.hostingInstance;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
deleted file mode 100644
index 5c50bd3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-/**
- * Representation of a host of a compute cluster.
- * <p>
- * This class is thread-safe.
- *
- */
-class ClusterInstance extends AbstractInstance {
-
- /**
- * A map of slices allocated on this host.
- */
- private final Map<AllocationID, AllocatedSlice> allocatedSlices = new HashMap<AllocationID, AllocatedSlice>();
-
- /**
- * The remaining capacity of this host that can be used by instances.
- */
- private InstanceType remainingCapacity;
-
- /**
- * Time when last heat beat has been received from the task manager running on this instance.
- */
- private long lastReceivedHeartBeat = System.currentTimeMillis();
-
- /**
- * Constructs a new cluster instance.
- *
- * @param instanceConnectionInfo
- * the instance connection info identifying the host
- * @param capacity
- * capacity of this host
- * @param parentNode
- * the parent node of this node in the network topology
- * @param networkTopology
- * the network topology this node is part of
- * @param hardwareDescription
- * the hardware description reported by the instance itself
- */
- public ClusterInstance(final InstanceConnectionInfo instanceConnectionInfo, final InstanceType capacity,
- final NetworkNode parentNode, final NetworkTopology networkTopology,
- final HardwareDescription hardwareDescription) {
-
- super(capacity, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
-
- this.remainingCapacity = capacity;
- }
-
- /**
- * Updates the time of last received heart beat to the current system time.
- */
- synchronized void reportHeartBeat() {
- this.lastReceivedHeartBeat = System.currentTimeMillis();
- }
-
- /**
- * Returns whether the host is still alive.
- *
- * @param cleanUpInterval
- * duration (in milliseconds) after which a host is
- * considered dead if it has no received heat-beats.
- * @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
- * has expired, <code>false</code> otherwise
- */
- synchronized boolean isStillAlive(final long cleanUpInterval) {
-
- if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
- return false;
- }
- return true;
- }
-
- /**
- * Tries to create a new slice on this instance.
- *
- * @param reqType
- * the type describing the hardware characteristics of the slice
- * @param jobID
- * the ID of the job the new slice belongs to
- * @return a new {@AllocatedSlice} object if a slice with the given hardware characteristics could
- * still be accommodated on this instance or <code>null</code> if the instance's remaining resources
- * were insufficient to host the desired slice
- */
- synchronized AllocatedSlice createSlice(final InstanceType reqType, final JobID jobID) {
-
- // check whether we can accommodate the instance
- if (remainingCapacity.getNumberOfComputeUnits() >= reqType.getNumberOfComputeUnits()
- && remainingCapacity.getNumberOfCores() >= reqType.getNumberOfCores()
- && remainingCapacity.getMemorySize() >= reqType.getMemorySize()
- && remainingCapacity.getDiskCapacity() >= reqType.getDiskCapacity()) {
-
- // reduce available capacity by what has been requested
- remainingCapacity = InstanceTypeFactory.construct(remainingCapacity.getIdentifier(), remainingCapacity
- .getNumberOfComputeUnits()
- - reqType.getNumberOfComputeUnits(), remainingCapacity.getNumberOfCores() - reqType.getNumberOfCores(),
- remainingCapacity.getMemorySize() - reqType.getMemorySize(), remainingCapacity.getDiskCapacity()
- - reqType.getDiskCapacity(), remainingCapacity.getPricePerHour());
-
- final long allocationTime = System.currentTimeMillis();
-
- final AllocatedSlice slice = new AllocatedSlice(this, reqType, jobID, allocationTime);
- this.allocatedSlices.put(slice.getAllocationID(), slice);
- return slice;
- }
-
- // we cannot accommodate the instance
- return null;
- }
-
- /**
- * Removes the slice identified by the given allocation ID from
- * this instance and frees up the allocated resources.
- *
- * @param allocationID
- * the allocation ID of the slice to be removed
- * @return the slice with has been removed from the instance or <code>null</code> if no slice
- * with the given allocation ID could be found
- */
- synchronized AllocatedSlice removeAllocatedSlice(final AllocationID allocationID) {
-
- final AllocatedSlice slice = this.allocatedSlices.remove(allocationID);
- if (slice != null) {
-
- this.remainingCapacity = InstanceTypeFactory.construct(this.remainingCapacity.getIdentifier(),
- this.remainingCapacity
- .getNumberOfComputeUnits()
- + slice.getType().getNumberOfComputeUnits(), this.remainingCapacity.getNumberOfCores()
- + slice.getType().getNumberOfCores(), this.remainingCapacity.getMemorySize()
- + slice.getType().getMemorySize(), this.remainingCapacity.getDiskCapacity()
- + slice.getType().getDiskCapacity(), this.remainingCapacity.getPricePerHour());
- }
-
- return slice;
- }
-
- /**
- * Removes all allocated slices on this instance and frees
- * up their allocated resources.
- *
- * @return a list of all removed slices
- */
- synchronized List<AllocatedSlice> removeAllAllocatedSlices() {
-
- final List<AllocatedSlice> slices = new ArrayList<AllocatedSlice>(this.allocatedSlices.values());
- final Iterator<AllocatedSlice> it = slices.iterator();
- while (it.hasNext()) {
- removeAllocatedSlice(it.next().getAllocationID());
- }
-
- return slices;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
deleted file mode 100644
index 39d2132..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.List;
-
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link AbstractInstance} to the given {@link InstanceListener} object. The notification
- * must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSINING could not be guaranteed.
- * This class is thread-safe.
- *
- */
-public class ClusterInstanceNotifier extends Thread {
-
- /**
- * The {@link InstanceListener} object to send the notification to.
- */
- private final InstanceListener instanceListener;
-
- /**
- * The ID of the job the notification refers to.
- */
- private final JobID jobID;
-
- /**
- * The allocated resources the notification refers to.
- */
- private final List<AllocatedResource> allocatedResources;
-
- /**
- * Constructs a new instance notifier object.
- *
- * @param instanceListener
- * the listener to send the notification to
- * @param jobID
- * the ID of the job the notification refers to
- * @param allocatedResources
- * the resources with has been allocated for the job
- */
- public ClusterInstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
- final List<AllocatedResource> allocatedResources) {
- this.instanceListener = instanceListener;
- this.jobID = jobID;
- this.allocatedResources = allocatedResources;
- }
-
-
- @Override
- public void run() {
-
- this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
deleted file mode 100644
index 480e521..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
+++ /dev/null
@@ -1,945 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-import eu.stratosphere.nephele.util.SerializableHashMap;
-
-/**
- * Instance Manager for a static cluster.
- * <p>
- * The cluster manager can handle heterogeneous instances (compute nodes). Each instance type used in the cluster must
- * be described in the configuration.
- * <p>
- * This is a sample configuration: <code>
- * # definition of instances in format
- * # instancename,numComputeUnits,numCores,memorySize,diskCapacity,pricePerHour
- * instancemanager.cluster.type.1 = m1.small,2,1,2048,10,10
- * instancemanager.cluster.type. = c1.medium,2,1,2048,10,10
- * instancemanager.cluster.type. = m1.large,4,2,2048,10,10
- * instancemanager.cluster.type. = m1.xlarge,8,4,8192,20,20
- * instancemanager.cluster.type. = c1.xlarge,8,4,16384,20,40
- *
- * # default instance type
- * instancemanager.cluster.defaulttype = 1 (pointing to m1.small)
- * </code> Each instance is expected to run exactly one {@link eu.stratosphere.nephele.taskmanager.TaskManager}. When
- * the {@link eu.stratosphere.nephele.taskmanager.TaskManager} registers with the
- * {@link eu.stratosphere.nephele.jobmanager.JobManager} it sends a {@link HardwareDescription} which describes the
- * actual hardware characteristics of the instance (compute node). The cluster manage will attempt to match the report
- * hardware characteristics with one of the configured instance types. Moreover, the cluster manager is capable of
- * partitioning larger instances (compute nodes) into smaller, less powerful instances.
- */
-public class ClusterManager implements InstanceManager {
-
- // ------------------------------------------------------------------------
- // Internal Constants
- // ------------------------------------------------------------------------
-
- /**
- * The log object used to report debugging and error information.
- */
- private static final Log LOG = LogFactory.getLog(ClusterManager.class);
-
- /**
- * Default duration after which a host is purged in case it did not send
- * a heart-beat message.
- */
- private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
-
- /**
- * The key to retrieve the clean up interval from the configuration.
- */
- private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
-
- // ------------------------------------------------------------------------
- // Fields
- // ------------------------------------------------------------------------
-
- private final Object lock = new Object();
-
- /**
- * Duration after which a host is purged in case it did not send a
- * heart-beat message.
- */
- private final long cleanUpInterval;
-
- /**
- * The default instance type.
- */
- private final InstanceType defaultInstanceType;
-
- /**
- * Set of hosts known to run a task manager that are thus able to execute
- * tasks.
- */
- private final Map<InstanceConnectionInfo, ClusterInstance> registeredHosts;
-
- /**
- * Map of a {@link JobID} to all {@link AllocatedSlice}s that belong to this job.
- */
- private final Map<JobID, List<AllocatedSlice>> slicesOfJobs;
-
- /**
- * List of instance types that can be executed on this cluster, sorted by
- * price (cheapest to most expensive).
- */
- private final InstanceType[] availableInstanceTypes;
-
- /**
- * Map of instance type descriptions which can be queried by the job manager.
- */
- private final Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptionMap;
-
- /**
- * Map of IP addresses to instance types.
- */
- private final Map<InetAddress, InstanceType> ipToInstanceTypeMapping = new HashMap<InetAddress, InstanceType>();
-
- /**
- * Map of pending requests of a job, i.e. the instance requests that could not be fulfilled during the initial
- * instance request.
- */
- private final Map<JobID, PendingRequestsMap> pendingRequestsOfJob = new LinkedHashMap<JobID, PendingRequestsMap>();
-
- /**
- * The network topology of the cluster.
- */
- private final NetworkTopology networkTopology;
-
- /**
- * Object that is notified if instances become available or vanish.
- */
- private InstanceListener instanceListener;
-
- /**
- * Matrix storing how many instances of a particular type and be accommodated in another instance type.
- */
- private final int[][] instanceAccommodationMatrix;
-
- private boolean shutdown;
-
- /**
- * Periodic task that checks whether hosts have not sent their heart-beat
- * messages and purges the hosts in this case.
- */
- private final TimerTask cleanupStaleMachines = new TimerTask() {
-
- @Override
- public void run() {
-
- synchronized (ClusterManager.this.lock) {
-
- final List<Map.Entry<InstanceConnectionInfo, ClusterInstance>> hostsToRemove =
- new ArrayList<Map.Entry<InstanceConnectionInfo, ClusterInstance>>();
-
- final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
-
- // check all hosts whether they did not send heat-beat messages.
- for (Map.Entry<InstanceConnectionInfo, ClusterInstance> entry : registeredHosts.entrySet()) {
-
- final ClusterInstance host = entry.getValue();
- if (!host.isStillAlive(cleanUpInterval)) {
-
- // this host has not sent the heat-beat messages
- // -> we terminate all instances running on this host and notify the jobs
- final List<AllocatedSlice> removedSlices = host.removeAllAllocatedSlices();
- for (AllocatedSlice removedSlice : removedSlices) {
-
- final JobID jobID = removedSlice.getJobID();
- final List<AllocatedSlice> slicesOfJob = slicesOfJobs.get(jobID);
- if (slicesOfJob == null) {
- LOG.error("Cannot find allocated slices for job with ID + " + jobID);
- continue;
- }
-
- slicesOfJob.remove(removedSlice);
-
- // Clean up
- if (slicesOfJob.isEmpty()) {
- slicesOfJobs.remove(jobID);
- }
-
- List<AllocatedResource> staleResourcesOfJob = staleResources.get(removedSlice.getJobID());
- if (staleResourcesOfJob == null) {
- staleResourcesOfJob = new ArrayList<AllocatedResource>();
- staleResources.put(removedSlice.getJobID(), staleResourcesOfJob);
- }
-
- staleResourcesOfJob.add(new AllocatedResource(removedSlice.getHostingInstance(),
- removedSlice.getType(),
- removedSlice.getAllocationID()));
- }
-
- hostsToRemove.add(entry);
- }
- }
-
- registeredHosts.entrySet().removeAll(hostsToRemove);
-
- updateInstaceTypeDescriptionMap();
-
- final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
- while (it.hasNext()) {
- final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
- if (instanceListener != null) {
- instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
- }
- }
- }
- }
- };
-
- // ------------------------------------------------------------------------
- // Constructor and set-up
- // ------------------------------------------------------------------------
-
- /**
- * Constructor.
- */
- public ClusterManager() {
-
- this.registeredHosts = new HashMap<InstanceConnectionInfo, ClusterInstance>();
-
- this.slicesOfJobs = new HashMap<JobID, List<AllocatedSlice>>();
-
- // Load the instance type this cluster can offer
- this.defaultInstanceType = InstanceTypeFactory.constructFromDescription(ConfigConstants.DEFAULT_INSTANCE_TYPE);
-
- this.availableInstanceTypes = new InstanceType[] { this.defaultInstanceType };
-
- this.instanceAccommodationMatrix = calculateInstanceAccommodationMatrix();
-
- this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
-
- long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
-
- if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
- LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
- + " secs.");
- tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
- }
-
- this.cleanUpInterval = tmpCleanUpInterval;
-
- // sort available instances by CPU core
- sortAvailableInstancesByNumberOfCPUCores();
-
- this.networkTopology = NetworkTopology.createEmptyTopology();
-
- // look every BASEINTERVAL milliseconds for crashed hosts
- final boolean runTimerAsDaemon = true;
- new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
-
- // Load available instance types into the instance description list
- updateInstaceTypeDescriptionMap();
- }
-
- /**
- * Sorts the list of available instance types by the number of CPU cores in a descending order.
- */
- private void sortAvailableInstancesByNumberOfCPUCores() {
-
- if (this.availableInstanceTypes.length < 2) {
- return;
- }
-
- for (int i = 1; i < this.availableInstanceTypes.length; i++) {
- final InstanceType it = this.availableInstanceTypes[i];
- int j = i;
- while (j > 0 && this.availableInstanceTypes[j - 1].getNumberOfCores() < it.getNumberOfCores()) {
- this.availableInstanceTypes[j] = this.availableInstanceTypes[j - 1];
- --j;
- }
- this.availableInstanceTypes[j] = it;
- }
- }
-
- @Override
- public void shutdown() {
- synchronized (this.lock) {
- if (this.shutdown) {
- return;
- }
-
- this.cleanupStaleMachines.cancel();
-
- Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
- while (it.hasNext()) {
- it.next().destroyProxies();
- }
- this.registeredHosts.clear();
-
- this.shutdown = true;
- }
- }
-
- @Override
- public InstanceType getDefaultInstanceType() {
- return this.defaultInstanceType;
- }
-
- @Override
- public InstanceType getInstanceTypeByName(String instanceTypeName) {
- synchronized (this.lock) {
- for (InstanceType it : availableInstanceTypes) {
- if (it.getIdentifier().equals(instanceTypeName)) {
- return it;
- }
- }
- }
-
- return null;
- }
-
-
- @Override
- public InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores,
- int minMemorySize, int minDiskCapacity, int maxPricePerHour)
- {
- // the instances are sorted by price -> the first instance that
- // fulfills/ the requirements is suitable and the cheapest
-
- synchronized (this.lock) {
- for (InstanceType i : availableInstanceTypes) {
- if (i.getNumberOfComputeUnits() >= minNumComputeUnits && i.getNumberOfCores() >= minNumCPUCores
- && i.getMemorySize() >= minMemorySize && i.getDiskCapacity() >= minDiskCapacity
- && i.getPricePerHour() <= maxPricePerHour) {
- return i;
- }
- }
- }
- return null;
- }
-
-
- @Override
- public void releaseAllocatedResource(JobID jobID, Configuration conf,
- AllocatedResource allocatedResource) throws InstanceException
- {
- synchronized (this.lock) {
- // release the instance from the host
- final ClusterInstance clusterInstance = (ClusterInstance) allocatedResource.getInstance();
- final AllocatedSlice removedSlice = clusterInstance.removeAllocatedSlice(allocatedResource.getAllocationID());
-
- // remove the local association between instance and job
- final List<AllocatedSlice> slicesOfJob = this.slicesOfJobs.get(jobID);
- if (slicesOfJob == null) {
- LOG.error("Cannot find allocated slice to release allocated slice for job " + jobID);
- return;
- }
-
- slicesOfJob.remove(removedSlice);
-
- // Clean up
- if (slicesOfJob.isEmpty()) {
- this.slicesOfJobs.remove(jobID);
- }
-
- // Check pending requests
- checkPendingRequests();
- }
- }
-
- /**
- * Creates a new {@link ClusterInstance} object to manage instances that can
- * be executed on that host.
- *
- * @param instanceConnectionInfo
- * the connection information for the instance
- * @param hardwareDescription
- * the hardware description provided by the new instance
- * @return a new {@link ClusterInstance} object or <code>null</code> if the cluster instance could not be created
- */
- private ClusterInstance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
- final HardwareDescription hardwareDescription) {
-
- // Check if there is a user-defined instance type for this IP address
- InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.address());
- if (instanceType != null) {
- LOG.info("Found user-defined instance type for cluster instance with IP "
- + instanceConnectionInfo.address() + ": " + instanceType);
- } else {
- instanceType = matchHardwareDescriptionWithInstanceType(hardwareDescription);
- if (instanceType != null) {
- LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.address()
- + " matches with instance type " + instanceType);
- } else {
- LOG.error("No matching instance type, cannot create cluster instance");
- return null;
- }
- }
-
- // Try to match new host with a stub host from the existing topology
- String instanceName = instanceConnectionInfo.hostname();
- NetworkNode parentNode = this.networkTopology.getRootNode();
- NetworkNode currentStubNode = null;
-
- // Try to match new host using the host name
- while (true) {
-
- currentStubNode = this.networkTopology.getNodeByName(instanceName);
- if (currentStubNode != null) {
- break;
- }
-
- final int pos = instanceName.lastIndexOf('.');
- if (pos == -1) {
- break;
- }
-
- /*
- * If host name is reported as FQDN, iterative remove parts
- * of the domain name until a match occurs or no more dots
- * can be found in the host name.
- */
- instanceName = instanceName.substring(0, pos);
- }
-
- // Try to match the new host using the IP address
- if (currentStubNode == null) {
- instanceName = instanceConnectionInfo.address().toString();
- instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
- currentStubNode = this.networkTopology.getNodeByName(instanceName);
- }
-
- if (currentStubNode != null) {
- /*
- * The instance name will be the same as the one of the stub node. That way
- * the stub now will be removed from the network topology and replaced be
- * the new node.
- */
- if (currentStubNode.getParentNode() != null) {
- parentNode = currentStubNode.getParentNode();
- }
- // Remove the stub node from the tree
- currentStubNode.remove();
- }
-
- LOG.info("Creating instance of type " + instanceType + " for " + instanceConnectionInfo + ", parent is "
- + parentNode.getName());
- final ClusterInstance host = new ClusterInstance(instanceConnectionInfo, instanceType, parentNode,
- this.networkTopology, hardwareDescription);
-
- return host;
- }
-
- /**
- * Attempts to match the hardware characteristics provided by the {@link HardwareDescription} object with one
- * of the instance types set in the configuration. The matching is pessimistic, i.e. the hardware characteristics of
- * the chosen instance type never exceed the actually reported characteristics from the hardware description.
- *
- * @param hardwareDescription
- * the hardware description as reported by the instance
- * @return the best matching instance type or <code>null</code> if no matching instance type can be found
- */
- private InstanceType matchHardwareDescriptionWithInstanceType(final HardwareDescription hardwareDescription) {
-
- // Assumes that the available instance types are ordered by number of CPU cores in descending order
- for (int i = 0; i < this.availableInstanceTypes.length; i++) {
-
- final InstanceType candidateInstanceType = this.availableInstanceTypes[i];
- // Check if number of CPU cores match
- if (candidateInstanceType.getNumberOfCores() > hardwareDescription.getNumberOfCPUCores()) {
- continue;
- }
-
- // Check if size of physical memory matches
- final int memoryInMB = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L));
- if (candidateInstanceType.getMemorySize() > memoryInMB) {
- continue;
- }
-
- return candidateInstanceType;
- }
-
- LOG.error("Cannot find matching instance type for hardware description ("
- + hardwareDescription.getNumberOfCPUCores() + " cores, " + hardwareDescription.getSizeOfPhysicalMemory()
- + " bytes of memory)");
-
- return null;
- }
-
-
- @Override
- public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription) {
-
- synchronized (this.lock) {
- ClusterInstance host = registeredHosts.get(instanceConnectionInfo);
-
- // check whether we have discovered a new host
- if (host == null) {
- host = createNewHost(instanceConnectionInfo, hardwareDescription);
-
- if (host == null) {
- LOG.error("Could not create a new host object for incoming heart-beat. "
- + "Probably the configuration file is lacking some entries.");
- return;
- }
-
- this.registeredHosts.put(instanceConnectionInfo, host);
- LOG.info("New number of registered hosts is " + this.registeredHosts.size());
-
- // Update the list of instance type descriptions
- updateInstaceTypeDescriptionMap();
-
- // Check if a pending request can be fulfilled by the new host
- checkPendingRequests();
- }
-
- host.reportHeartBeat();
- }
- }
-
- /**
- * Checks if a pending request can be fulfilled.
- */
- private void checkPendingRequests() {
-
- final Iterator<Map.Entry<JobID, PendingRequestsMap>> it = this.pendingRequestsOfJob.entrySet().iterator();
- while (it.hasNext()) {
-
- final List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
- final Map.Entry<JobID, PendingRequestsMap> entry = it.next();
- final JobID jobID = entry.getKey();
- final PendingRequestsMap pendingRequestsMap = entry.getValue();
- final Iterator<Map.Entry<InstanceType, Integer>> it2 = pendingRequestsMap.iterator();
- while (it2.hasNext()) {
-
- final Map.Entry<InstanceType, Integer> entry2 = it2.next();
- final InstanceType requestedInstanceType = entry2.getKey();
- int numberOfPendingInstances = entry2.getValue().intValue();
-
- // Consistency check
- if (numberOfPendingInstances <= 0) {
- LOG.error("Inconsistency: Job " + jobID + " has " + numberOfPendingInstances
- + " requests for instance type " + requestedInstanceType.getIdentifier());
- continue;
- }
-
- while (numberOfPendingInstances > 0) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to allocate instance of type " + requestedInstanceType.getIdentifier());
- }
-
- // TODO: Introduce topology awareness here
- final AllocatedSlice slice = getSliceOfType(jobID, requestedInstanceType);
- if (slice == null) {
- break;
- } else {
-
- LOG.info("Allocated instance of type " + requestedInstanceType.getIdentifier()
- + " as a result of pending request for job " + jobID);
-
- // Decrease number of pending instances
- --numberOfPendingInstances;
- pendingRequestsMap.decreaseNumberOfPendingInstances(requestedInstanceType);
-
- List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
- if (allocatedSlices == null) {
- allocatedSlices = new ArrayList<AllocatedSlice>();
- this.slicesOfJobs.put(jobID, allocatedSlices);
- }
- allocatedSlices.add(slice);
-
- allocatedResources.add(new AllocatedResource(slice.getHostingInstance(), slice.getType(), slice
- .getAllocationID()));
- }
- }
- }
-
- if (!allocatedResources.isEmpty() && this.instanceListener != null) {
-
- final ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
- this.instanceListener, jobID, allocatedResources);
-
- clusterInstanceNotifier.start();
- }
- }
- }
-
- /**
- * Attempts to allocate a slice of the given type for the given job. The method first attempts to allocate this
- * slice by finding a physical host which exactly matches the given instance type. If this attempt failed, it tries
- * to allocate the slice by partitioning the resources of a more powerful host.
- *
- * @param jobID
- * the ID of the job the slice shall be allocated for
- * @param instanceType
- * the instance type of the requested slice
- * @return the allocated slice or <code>null</code> if no such slice could be allocated
- */
- private AllocatedSlice getSliceOfType(final JobID jobID, final InstanceType instanceType) {
-
- AllocatedSlice slice = null;
-
- // Try to match the instance type without slicing first
- for (final ClusterInstance host : this.registeredHosts.values()) {
- if (host.getType().equals(instanceType)) {
- slice = host.createSlice(instanceType, jobID);
- if (slice != null) {
- break;
- }
- }
- }
-
- // Use slicing now if necessary
- if (slice == null) {
-
- for (final ClusterInstance host : this.registeredHosts.values()) {
- slice = host.createSlice(instanceType, jobID);
- if (slice != null) {
- break;
- }
- }
- }
-
- return slice;
- }
-
-
- @Override
- public void requestInstance(JobID jobID, Configuration conf, InstanceRequestMap instanceRequestMap, List<String> splitAffinityList)
- throws InstanceException
- {
- final List<AllocatedSlice> newlyAllocatedSlicesOfJob = new ArrayList<AllocatedSlice>();
- final Map<InstanceType, Integer> pendingRequests = new HashMap<InstanceType, Integer>();
-
- synchronized(this.lock) {
- // Iterate over all instance types
- for (Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMaximumIterator(); it.hasNext();) {
-
- // Iterate over all requested instances of a specific type
- final Map.Entry<InstanceType, Integer> entry = it.next();
- final int maximumNumberOfInstances = entry.getValue().intValue();
-
- for (int i = 0; i < maximumNumberOfInstances; i++) {
-
- LOG.info("Trying to allocate instance of type " + entry.getKey().getIdentifier());
-
- final AllocatedSlice slice = getSliceOfType(jobID, entry.getKey());
-
- if (slice == null) {
- if (i < instanceRequestMap.getMinimumNumberOfInstances(entry.getKey())) {
- // The request cannot be fulfilled, release the slices again and throw an exception
- for (final AllocatedSlice sliceToRelease : newlyAllocatedSlicesOfJob) {
- sliceToRelease.getHostingInstance().removeAllocatedSlice(sliceToRelease.getAllocationID());
- }
-
- // TODO: Remove previously allocated slices again
- throw new InstanceException("Could not find a suitable instance");
- } else {
-
- // Remaining instances are pending
- final int numberOfRemainingInstances = maximumNumberOfInstances - i;
- if (numberOfRemainingInstances > 0) {
-
- // Store the request for the missing instances
- Integer val = pendingRequests.get(entry.getKey());
- if (val == null) {
- val = Integer.valueOf(0);
- }
- val = Integer.valueOf(val.intValue() + numberOfRemainingInstances);
- pendingRequests.put(entry.getKey(), val);
- }
-
- break;
- }
- }
-
- newlyAllocatedSlicesOfJob.add(slice);
- }
- }
-
- // The request could be processed successfully, so update internal bookkeeping.
- List<AllocatedSlice> allAllocatedSlicesOfJob = this.slicesOfJobs.get(jobID);
- if (allAllocatedSlicesOfJob == null) {
- allAllocatedSlicesOfJob = new ArrayList<AllocatedSlice>();
- this.slicesOfJobs.put(jobID, allAllocatedSlicesOfJob);
- }
- allAllocatedSlicesOfJob.addAll(newlyAllocatedSlicesOfJob);
-
- PendingRequestsMap allPendingRequestsOfJob = this.pendingRequestsOfJob.get(jobID);
- if (allPendingRequestsOfJob == null) {
- allPendingRequestsOfJob = new PendingRequestsMap();
- this.pendingRequestsOfJob.put(jobID, allPendingRequestsOfJob);
- }
- for (final Iterator<Map.Entry<InstanceType, Integer>> it = pendingRequests.entrySet().iterator(); it.hasNext();) {
- final Map.Entry<InstanceType, Integer> entry = it.next();
-
- allPendingRequestsOfJob.addRequest(entry.getKey(), entry.getValue().intValue());
- }
-
- // Finally, create the list of allocated resources for the scheduler
- final List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
- for (final AllocatedSlice slice : newlyAllocatedSlicesOfJob) {
- allocatedResources.add(new AllocatedResource(slice.getHostingInstance(), slice.getType(), slice
- .getAllocationID()));
- }
-
- if (this.instanceListener != null) {
- final ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
- this.instanceListener, jobID, allocatedResources);
- clusterInstanceNotifier.start();
- }
- }
- }
-
-
- @Override
- public NetworkTopology getNetworkTopology(JobID jobID) {
- return this.networkTopology;
- }
-
-
- @Override
- public void setInstanceListener(InstanceListener instanceListener) {
- synchronized (this.lock) {
- this.instanceListener = instanceListener;
- }
- }
-
-
- @Override
- public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
- Map<InstanceType, InstanceTypeDescription> copyToReturn = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
- synchronized (this.lock) {
- copyToReturn.putAll(this.instanceTypeDescriptionMap);
- }
- return copyToReturn;
- }
-
- /**
- * Updates the list of instance type descriptions based on the currently registered hosts.
- */
- private void updateInstaceTypeDescriptionMap() {
-
- // this.registeredHosts.values().iterator()
- this.instanceTypeDescriptionMap.clear();
-
- final List<InstanceTypeDescription> instanceTypeDescriptionList = new ArrayList<InstanceTypeDescription>();
-
- // initialize array which stores the availability counter for each instance type
- final int[] numberOfInstances = new int[this.availableInstanceTypes.length];
- for (int i = 0; i < numberOfInstances.length; i++) {
- numberOfInstances[i] = 0;
- }
-
- // Shuffle through instance types
- for (int i = 0; i < this.availableInstanceTypes.length; i++) {
-
- final InstanceType currentInstanceType = this.availableInstanceTypes[i];
- int numberOfMatchingInstances = 0;
- int minNumberOfCPUCores = Integer.MAX_VALUE;
- long minSizeOfPhysicalMemory = Long.MAX_VALUE;
- long minSizeOfFreeMemory = Long.MAX_VALUE;
- final Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
- while (it.hasNext()) {
- final ClusterInstance clusterInstance = it.next();
- if (clusterInstance.getType().equals(currentInstanceType)) {
- ++numberOfMatchingInstances;
- final HardwareDescription hardwareDescription = clusterInstance.getHardwareDescription();
- minNumberOfCPUCores = Math.min(minNumberOfCPUCores, hardwareDescription.getNumberOfCPUCores());
- minSizeOfPhysicalMemory = Math.min(minSizeOfPhysicalMemory,
- hardwareDescription.getSizeOfPhysicalMemory());
- minSizeOfFreeMemory = Math.min(minSizeOfFreeMemory, hardwareDescription.getSizeOfFreeMemory());
- }
- }
-
- // Update number of instances
- int highestAccommodationNumber = -1;
- int highestAccommodationIndex = -1;
- for (int j = 0; j < this.availableInstanceTypes.length; j++) {
- final int accommodationNumber = canBeAccommodated(j, i);
- // LOG.debug(this.availableInstanceTypes[j].getIdentifier() + " fits into "
- // + this.availableInstanceTypes[i].getIdentifier() + " " + accommodationNumber + " times");
- if (accommodationNumber > 0) {
- numberOfInstances[j] += numberOfMatchingInstances * accommodationNumber;
- if (accommodationNumber > highestAccommodationNumber) {
- highestAccommodationNumber = accommodationNumber;
- highestAccommodationIndex = j;
- }
- }
- }
-
- // Calculate hardware description
- HardwareDescription pessimisticHardwareDescription = null;
- if (minNumberOfCPUCores < Integer.MAX_VALUE && minSizeOfPhysicalMemory < Long.MAX_VALUE
- && minSizeOfFreeMemory < Long.MAX_VALUE) {
-
- pessimisticHardwareDescription = HardwareDescriptionFactory.construct(minNumberOfCPUCores,
- minSizeOfPhysicalMemory, minSizeOfFreeMemory);
-
- } else {
-
- if (highestAccommodationIndex < i) { // Since highestAccommodationIndex smaller than my index, the
- // target instance must be more powerful
-
- final InstanceTypeDescription descriptionOfLargerInstanceType = instanceTypeDescriptionList
- .get(highestAccommodationIndex);
- if (descriptionOfLargerInstanceType.getHardwareDescription() != null) {
- final HardwareDescription hardwareDescriptionOfLargerInstanceType = descriptionOfLargerInstanceType
- .getHardwareDescription();
-
- final int numCores = hardwareDescriptionOfLargerInstanceType.getNumberOfCPUCores()
- / highestAccommodationNumber;
- final long physMem = hardwareDescriptionOfLargerInstanceType.getSizeOfPhysicalMemory()
- / highestAccommodationNumber;
- final long freeMem = hardwareDescriptionOfLargerInstanceType.getSizeOfFreeMemory()
- / highestAccommodationNumber;
-
- pessimisticHardwareDescription = HardwareDescriptionFactory.construct(numCores, physMem,
- freeMem);
- }
- }
- }
-
- instanceTypeDescriptionList.add(InstanceTypeDescriptionFactory.construct(currentInstanceType,
- pessimisticHardwareDescription, numberOfInstances[i]));
- }
-
- final Iterator<InstanceTypeDescription> it = instanceTypeDescriptionList.iterator();
- while (it.hasNext()) {
-
- final InstanceTypeDescription itd = it.next();
- this.instanceTypeDescriptionMap.put(itd.getInstanceType(), itd);
- }
- }
-
- /**
- * Calculates the instance accommodation matrix which stores how many times a particular instance type can be
- * accommodated inside another instance type based on the list of available instance types.
- *
- * @return the instance accommodation matrix
- */
- private int[][] calculateInstanceAccommodationMatrix() {
-
- if (this.availableInstanceTypes == null) {
- LOG.error("Cannot compute instance accommodation matrix: availableInstanceTypes is null");
- return null;
- }
-
- final int matrixSize = this.availableInstanceTypes.length;
- final int[][] am = new int[matrixSize][matrixSize];
-
- // Populate matrix
- for (int i = 0; i < matrixSize; i++) {
- for (int j = 0; j < matrixSize; j++) {
-
- if (i == j) {
- am[i][j] = 1;
- } else {
-
- final InstanceType sourceType = this.availableInstanceTypes[i];
- InstanceType targetType = this.availableInstanceTypes[j];
-
- // How many times can we accommodate source type into target type?
- final int cores = targetType.getNumberOfCores() / sourceType.getNumberOfCores();
- final int cu = targetType.getNumberOfComputeUnits() / sourceType.getNumberOfComputeUnits();
- final int mem = targetType.getMemorySize() / sourceType.getMemorySize();
- final int disk = targetType.getDiskCapacity() / sourceType.getDiskCapacity();
-
- am[i][j] = Math.min(cores, Math.min(cu, Math.min(mem, disk)));
- }
- }
- }
-
- return am;
- }
-
- /**
- * Returns how many times the instance type stored at index <code>sourceTypeIndex</code> can be accommodated inside
- * the instance type stored at index <code>targetTypeIndex</code> in the list of available instance types.
- *
- * @param sourceTypeIndex
- * the index of the source instance type in the list of available instance types
- * @param targetTypeIndex
- * the index of the target instance type in the list of available instance types
- * @return the number of times the source type instance can be accommodated inside the target instance
- */
- private int canBeAccommodated(int sourceTypeIndex, int targetTypeIndex) {
-
- if (sourceTypeIndex >= this.availableInstanceTypes.length
- || targetTypeIndex >= this.availableInstanceTypes.length) {
- LOG.error("Cannot determine number of instance accomodations: invalid index");
- return 0;
- }
-
- return this.instanceAccommodationMatrix[targetTypeIndex][sourceTypeIndex];
- }
-
-
- @Override
- public AbstractInstance getInstanceByName(String name) {
- if (name == null) {
- throw new IllegalArgumentException("Argument name must not be null");
- }
-
- synchronized (this.lock) {
- final Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
- while (it.hasNext()) {
- final AbstractInstance instance = it.next();
- if (name.equals(instance.getName())) {
- return instance;
- }
- }
- }
-
- return null;
- }
-
-
- @Override
- public void cancelPendingRequests(JobID jobID) {
- synchronized (this.lock) {
- this.pendingRequestsOfJob.remove(jobID);
- }
- }
-
- @Override
- public int getNumberOfTaskTrackers() {
- return this.registeredHosts.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
deleted file mode 100644
index ddc90e9..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.InstanceType;
-
-/**
- * This class represents a pending request, i.e. a request for a particular type and number of {@link AbstractInstance}
- * objects which could not be fulfilled yet.
- * <p>
- * This class is not thread-safe.
- *
- */
-public final class PendingRequestsMap {
-
- /**
- * The map storing the pending instance requests for the job this pending request object belongs to.
- */
- private final Map<InstanceType, Integer> pendingRequests = new HashMap<InstanceType, Integer>();
-
- /**
- * Checks if the job this object belongs to has pending instance requests.
- *
- * @return <code>true</code> if the job this object belongs to has pending instance requests, <code>false</code>
- * otherwise
- */
- boolean hasPendingRequests() {
-
- return !(this.pendingRequests.isEmpty());
- }
-
- /**
- * Adds the a pending request for the given number of instances of the given type to this map.
- *
- * @param instanceType
- * the requested instance type
- * @param numberOfInstances
- * the requested number of instances of this type
- */
- void addRequest(final InstanceType instanceType, final int numberOfInstances) {
-
- Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
- if (numberOfRemainingInstances == null) {
- numberOfRemainingInstances = Integer.valueOf(numberOfInstances);
- } else {
- numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() + numberOfInstances);
- }
-
- this.pendingRequests.put(instanceType, numberOfRemainingInstances);
- }
-
- /**
- * Returns an iterator for the pending requests encapsulated in this map.
- *
- * @return an iterator for the pending requests encapsulated in this map
- */
- Iterator<Map.Entry<InstanceType, Integer>> iterator() {
-
- return this.pendingRequests.entrySet().iterator();
- }
-
- /**
- * Decreases the number of remaining instances to request of the given type.
- *
- * @param instanceType
- * the instance type for which the number of remaining instances shall be decreased
- */
- void decreaseNumberOfPendingInstances(final InstanceType instanceType) {
-
- Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
- if (numberOfRemainingInstances == null) {
- return;
- }
-
- numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() - 1);
- if (numberOfRemainingInstances.intValue() == 0) {
- this.pendingRequests.remove(instanceType);
- } else {
- this.pendingRequests.put(instanceType, numberOfRemainingInstances);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
deleted file mode 100644
index 795889e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-public class LocalInstance extends AbstractInstance {
-
- public LocalInstance(InstanceType instanceType, InstanceConnectionInfo instanceConnectionInfo,
- NetworkNode parentNode, NetworkTopology networkTopology, HardwareDescription hardwareDescription) {
- super(instanceType, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
- }
-
-
- @Override
- public String toString() {
-
- return this.getInstanceConnectionInfo().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
deleted file mode 100644
index e888b3f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import java.io.File;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import eu.stratosphere.nephele.ExecutionMode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-import eu.stratosphere.nephele.util.SerializableHashMap;
-
-/**
- * The local instance manager is designed to manage instance allocation/deallocation for a single-node setup. It spans a
- * task manager which is executed within the same process as the job manager. Moreover, it determines the hardware
- * characteristics of the machine it runs on and generates a default instance type with the identifier "default". If
- * desired this default instance type can also be overwritten.
- */
-public class LocalInstanceManager implements InstanceManager {
-
- /**
- * The log object used to report events and errors.
- */
- private static final Log LOG = LogFactory.getLog(LocalInstanceManager.class);
-
- /**
- * The key for the configuration parameter defining the instance type to be used by the local instance manager. If
- * the parameter is not set, a default instance type with the identifier "default" is generated from the machine's
- * hardware characteristics.
- */
-
- private static final String LOCALINSTANCE_TYPE_KEY = "instancemanager.local.type";
-
- private static final int SLEEP_TIME = 50;
-
- private static final int START_STOP_TIMEOUT = 2000;
-
-
- /**
- * The instance listener registered with this instance manager.
- */
- private InstanceListener instanceListener;
-
- /**
- * The default instance type which is either generated from the hardware characteristics of the machine the local
- * instance manager runs on or read from the configuration.
- */
- private final InstanceType defaultInstanceType;
-
- /**
- * A synchronization object to protect critical sections.
- */
- private final Object synchronizationObject = new Object();
-
- /**
- * Stores which task manager is currently occupied by a job.
- */
- private Map<LocalInstance, AllocatedResource> allocatedResources = new HashMap<LocalInstance, AllocatedResource>();
-
- /**
- * The local instances encapsulating the task managers
- */
- private Map<InstanceConnectionInfo, LocalInstance> localInstances = new HashMap<InstanceConnectionInfo,
- LocalInstance>();
-
- /**
- * The threads running the local task managers.
- */
- private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
-
- /**
- * The network topology the local instance is part of.
- */
- private final NetworkTopology networkTopology;
-
- /**
- * The map of instance type descriptions.
- */
- private final Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptionMap;
-
- /**
- * Number of task managers
- */
- private final int numTaskManagers;
-
-
-
-
- /**
- * Constructs a new local instance manager.
- *
- */
- public LocalInstanceManager() throws Exception {
-
- final Configuration config = GlobalConfiguration.getConfiguration();
-
- // get the default instance type
- InstanceType type = null;
- final String descr = config.getString(LOCALINSTANCE_TYPE_KEY, null);
- if (descr != null) {
- LOG.info("Attempting to parse default instance type from string " + descr);
- type = InstanceTypeFactory.constructFromDescription(descr);
- if (type == null) {
- LOG.warn("Unable to parse default instance type from configuration, using hardware profile instead");
- }
- }
-
- this.defaultInstanceType = (type != null) ? type : createDefaultInstanceType();
-
- LOG.info("Default instance type is " + this.defaultInstanceType.getIdentifier());
-
- this.networkTopology = NetworkTopology.createEmptyTopology();
-
- this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
-
- numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants
- .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
-
- ExecutionMode executionMode = (numTaskManagers > 1) ? ExecutionMode.CLUSTER : ExecutionMode.LOCAL;
-
- for(int i=0; i< numTaskManagers; i++){
-
- Configuration tm = new Configuration();
- int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
- int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
-
- tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
- tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
-
- GlobalConfiguration.includeConfiguration(tm);
-
- TaskManager t = new TaskManager(executionMode);
- taskManagers.add(t);
- }
- }
-
-
- @Override
- public InstanceType getDefaultInstanceType() {
- return this.defaultInstanceType;
- }
-
-
- @Override
- public InstanceType getInstanceTypeByName(String instanceTypeName) {
- if (this.defaultInstanceType.getIdentifier().equals(instanceTypeName)) {
- return this.defaultInstanceType;
- }
-
- return null;
- }
-
-
- @Override
- public InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores,
- int minMemorySize, int minDiskCapacity, int maxPricePerHour) {
-
- if (minNumComputeUnits > this.defaultInstanceType.getNumberOfComputeUnits()) {
- return null;
- }
-
- if (minNumCPUCores > this.defaultInstanceType.getNumberOfCores()) {
- return null;
- }
-
- if (minMemorySize > this.defaultInstanceType.getMemorySize()) {
- return null;
- }
-
- if (minDiskCapacity > this.defaultInstanceType.getDiskCapacity()) {
- return null;
- }
-
- if (maxPricePerHour > this.defaultInstanceType.getPricePerHour()) {
- return null;
- }
-
- return this.defaultInstanceType;
- }
-
-
- @Override
- public void releaseAllocatedResource(final JobID jobID, final Configuration conf,
- final AllocatedResource allocatedResource)
- throws InstanceException {
- LocalInstance instance = (LocalInstance) allocatedResource.getInstance();
-
- synchronized (this.synchronizationObject) {
- if(allocatedResources.containsKey(allocatedResource.getInstance())){
- if(allocatedResources.get(instance).equals(allocatedResource)){
- allocatedResources.remove(instance);
- return;
- }
- }
- throw new InstanceException("Resource with allocation ID " + allocatedResource.getAllocationID()
- + " has not been allocated to job with ID " + jobID
- + " according to the local instance manager's internal bookkeeping");
-
- }
- }
-
-
- @Override
- public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo,
- final HardwareDescription hardwareDescription) {
-
- synchronized (this.synchronizationObject) {
- if(!localInstances.containsKey(instanceConnectionInfo)){
- LocalInstance localInstance = new LocalInstance(this.defaultInstanceType, instanceConnectionInfo,
- this.networkTopology.getRootNode(), this.networkTopology, hardwareDescription);
- localInstances.put(instanceConnectionInfo, localInstance);
-
- this.instanceTypeDescriptionMap.put(this.defaultInstanceType, InstanceTypeDescriptionFactory
- .construct(this.defaultInstanceType, hardwareDescription, localInstances.size()));
- }
- }
- }
-
-
- @Override
- public void shutdown() {
- // Stop the task managers
- for(TaskManager t : taskManagers){
- t.shutdown();
- }
-
- boolean areAllTaskManagerShutdown = false;
- int timeout = START_STOP_TIMEOUT * this.taskManagers.size();
-
- for(int sleep = 0; sleep < timeout; sleep += SLEEP_TIME){
- areAllTaskManagerShutdown = true;
-
- for(TaskManager t: taskManagers){
- if(!t.isShutDown()){
- areAllTaskManagerShutdown = false;
- break;
- }
- }
-
- if(areAllTaskManagerShutdown){
- break;
- }
-
- try {
- Thread.sleep(SLEEP_TIME);
- }catch(InterruptedException e){
- break;
- }
- }
-
- if(!areAllTaskManagerShutdown){
- throw new RuntimeException(String.format("TaskManager shut down timed out (%d ms).", timeout));
- }
-
- instanceTypeDescriptionMap.clear();
-
- synchronized(this.synchronizationObject){
- for(LocalInstance instance: this.localInstances.values()){
- instance.destroyProxies();
- }
-
- localInstances.clear();
- }
- }
-
-
- @Override
- public NetworkTopology getNetworkTopology(final JobID jobID) {
- return this.networkTopology;
- }
-
-
- @Override
- public void setInstanceListener(final InstanceListener instanceListener) {
- this.instanceListener = instanceListener;
- }
-
- /**
- * Creates a default instance type based on the hardware characteristics of the machine that calls this method. The
- * default instance type contains the machine's number of CPU cores and size of physical memory. The disc capacity
- * is calculated from the free space in the directory for temporary files.
- *
- * @return the default instance type used for the local machine
- */
- public static final InstanceType createDefaultInstanceType() {
- final HardwareDescription hardwareDescription = HardwareDescriptionFactory.extractFromSystem();
-
- int diskCapacityInGB = 0;
- final String[] tempDirs = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(File.pathSeparator);
-
- for (final String tempDir : tempDirs) {
- if (tempDir != null) {
- File f = new File(tempDir);
- diskCapacityInGB = Math.max(diskCapacityInGB, (int) (f.getFreeSpace() / (1024L * 1024L * 1024L)));
- }
- }
-
- final int physicalMemory = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L));
-
- return InstanceTypeFactory.construct("default", hardwareDescription.getNumberOfCPUCores(),
- hardwareDescription.getNumberOfCPUCores(), physicalMemory, diskCapacityInGB, 0);
- }
-
-
- @Override
- public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
- return this.instanceTypeDescriptionMap;
- }
-
- @Override
- public void requestInstance(final JobID jobID, final Configuration conf,
- final InstanceRequestMap instanceRequestMap,
- final List<String> splitAffinityList) throws InstanceException {
-
- // TODO: This can be implemented way simpler...
- // Iterate over all instance types
- final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
- final List<AllocatedResource> assignedResources = new ArrayList<AllocatedResource>();
- boolean assignmentSuccessful = true;
-
- while (it.hasNext()) {
-
- // Iterate over all requested instances of a specific type
- final Map.Entry<InstanceType, Integer> entry = it.next();
-
- for (int i = 0; i < entry.getValue().intValue(); i++) {
-
- synchronized (this.synchronizationObject) {
- boolean instanceFound = false;
- for(LocalInstance instance: localInstances.values()){
- if(!allocatedResources.containsKey(instance)){
- AllocatedResource assignedResource = new AllocatedResource(instance, entry.getKey(),
- new AllocationID());
- allocatedResources.put(instance, assignedResource);
- assignedResources.add(assignedResource);
- instanceFound = true;
- break;
- }
- }
-
- assignmentSuccessful &= instanceFound;
- }
- }
- }
-
- if(assignmentSuccessful){
- new LocalInstanceNotifier(this.instanceListener, jobID, assignedResources).start();
- }else{
- throw new InstanceException("Could not satisfy instance request.");
- }
- }
-
- @Override
- public AbstractInstance getInstanceByName(final String name) {
- if (name == null) {
- throw new IllegalArgumentException("Argument name must not be null");
- }
-
- synchronized (this.synchronizationObject) {
- for(LocalInstance instance :localInstances.values()){
- if(name.equals(instance.getName())){
- return instance;
- }
- }
- }
- return null;
- }
-
-
- @Override
- public void cancelPendingRequests(final JobID jobID) {
- // The local instance manager does not support pending requests, so nothing to do here
- }
-
- @Override
- public int getNumberOfTaskTrackers() {
- return localInstances.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
deleted file mode 100644
index 52da691..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import java.util.List;
-
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link AllocatedResource} to the given {@link InstanceListener} object. The notification
- * must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSINING could not be guaranteed.
- * This class is thread-safe.
- *
- */
-public class LocalInstanceNotifier extends Thread {
-
- /**
- * The {@link InstanceListener} object to send the notification to.
- */
- private final InstanceListener instanceListener;
-
- /**
- * The ID of the job the new instance belongs to.
- */
- private final JobID jobID;
-
- /**
- * The resources allocated for the job.
- */
- private final List<AllocatedResource> allocatedResources;
-
- /**
- * Constructs a new instance notifier object.
- *
- * @param instanceListener
- * the listener object to send the notification to
- * @param jobID
- * the ID of the job the newly allocated resources belongs to
- * @param allocatedResource
- * the resources allocated for the job
- */
- public LocalInstanceNotifier(final InstanceListener instanceListener, final JobID jobID, final List<AllocatedResource> allocatedResources) {
- this.instanceListener = instanceListener;
- this.jobID = jobID;
- this.allocatedResources = allocatedResources;
- }
-
-
- @Override
- public void run() {
-
- this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index b4c51f2..d64c622 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -67,16 +67,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
private int numberOfSubtasks = -1;
/**
- * The type of instance to be assigned to this task at runtime.
- */
- private String instanceType = null;
-
- /**
- * Number of subtasks to share the same instance at runtime.
- */
- private int numberOfSubtasksPerInstance = -1;
-
- /**
* Number of retries in case of an error before the task represented by this vertex is considered as failed.
*/
private int numberOfExecutionRetries = -1;
@@ -150,8 +140,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
* the vertex this vertex should connect to
* @param channelType
* the channel type the two vertices should be connected by at runtime
- * @param compressionLevel
- * the compression level the corresponding channel should have at runtime
* @throws JobGraphDefinitionException
* thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
*/
@@ -166,8 +154,8 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
* the vertex this vertex should connect to
* @param channelType
* the channel type the two vertices should be connected by at runtime
- * @param compressionLevel
- * the compression level the corresponding channel should have at runtime
+ * @param distributionPattern
+ * the distribution pattern between the two job vertices
* @throws JobGraphDefinitionException
* thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
*/
@@ -184,14 +172,14 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
* the vertex this vertex should connect to
* @param channelType
* the channel type the two vertices should be connected by at runtime
- * @param compressionLevel
- * the compression level the corresponding channel should have at runtime
* @param indexOfOutputGate
* index of the producing task's output gate to be used, <code>-1</code> will determine the next free index
* number
* @param indexOfInputGate
* index of the consuming task's input gate to be used, <code>-1</code> will determine the next free index
* number
+ * @param distributionPattern
+ * the distribution pattern between the two job vertices
* @throws JobGraphDefinitionException
* thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
*/
@@ -274,12 +262,12 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
* the job vertex to connect to
* @param channelType
* the channel type the two vertices should be connected by at runtime
- * @param compressionLevel
- * the compression level the corresponding channel should have at runtime
* @param indexOfOutputGate
* index of the producing task's output gate to be used
* @param indexOfInputGate
* index of the consuming task's input gate to be used
+ * @param distributionPattern
+ * the distribution pattern between the two job vertices
*/
private void connectBacklink(final AbstractJobVertex vertex, final ChannelType channelType,
final int indexOfOutputGate, final int indexOfInputGate,
@@ -364,32 +352,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
}
/**
- * Returns the index of the edge which is used to connect the given job vertex to this job vertex.
- *
- * @param jv
- * the connected job vertex
- * @return the index of the edge which is used to connect the given job vertex to this job vertex or -1 if the given
- * vertex is not connected to this job vertex
- */
- /*
- * public int getBackwardConnectionIndex(AbstractJobVertex jv) {
- * if(jv == null) {
- * return -1;
- * }
- * final Iterator<JobEdge> it = this.backwardEdges.iterator();
- * int i = 0;
- * while(it.hasNext()) {
- * final JobEdge edge = it.next();
- * if(edge.getConnectedVertex() == jv) {
- * return i;
- * }
- * i++;
- * }
- * return -1;
- * }
- */
-
- /**
* Returns the ID of this job vertex.
*
* @return the ID of this job vertex
@@ -407,15 +369,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
throw new IOException("jobGraph is null, cannot deserialize");
}
- // Read instance type
- this.instanceType = StringRecord.readString(in);
-
// Read number of subtasks
this.numberOfSubtasks = in.readInt();
- // Read number of subtasks per instance
- this.numberOfSubtasksPerInstance = in.readInt();
-
// Number of execution retries
this.numberOfExecutionRetries = in.readInt();
@@ -489,15 +445,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
@Override
public void write(final DataOutput out) throws IOException {
- // Instance type
- StringRecord.writeString(out, this.instanceType);
-
// Number of subtasks
out.writeInt(this.numberOfSubtasks);
- // Number of subtasks per instance
- out.writeInt(this.numberOfSubtasksPerInstance);
-
// Number of execution retries
out.writeInt(this.numberOfExecutionRetries);
@@ -595,44 +545,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
}
/**
- * Sets the instance type the task this vertex represents should run on.
- *
- * @param instanceType
- * the instance type the task this vertex represents should run on
- */
- public void setInstanceType(final String instanceType) {
- this.instanceType = instanceType;
- }
-
- /**
- * Returns the instance type the task this vertex represents should run on.
- *
- * @return the instance type the task this vertex represents should run on, <code>null</code> if unspecified
- */
- public String getInstanceType() {
- return this.instanceType;
- }
-
- /**
- * Sets the number of subtasks that should be assigned to the same instance.
- *
- * @param numberOfSubtasksPerInstance
- * the number of subtasks that should be assigned to the same instance
- */
- public void setNumberOfSubtasksPerInstance(final int numberOfSubtasksPerInstance) {
- this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
- }
-
- /**
- * Returns the number of subtasks that should be assigned to the same instance, <code>-1</code> if undefined.
- *
- * @return the number of subtasks that should be assigned to the same instance, <code>-1</code> if undefined
- */
- public int getNumberOfSubtasksPerInstance() {
- return this.numberOfSubtasksPerInstance;
- }
-
- /**
* Sets the vertex this vertex should share its instances with at runtime.
*
* @param vertex
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
index bf017ce..b043ecd 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
@@ -16,19 +16,19 @@ package eu.stratosphere.nephele.jobmanager;
import java.util.List;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
* A deployment manager is responsible for deploying a list of {@link ExecutionVertex} objects the given
- * {@link AbstractInstance}. It is called by a {@link AbstractScheduler} implementation whenever at least one
+ * {@link eu.stratosphere.nephele.instance.Instance}. It is called by a {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation whenever at least one
* {@link ExecutionVertex} has become ready to be executed.
*
*/
public interface DeploymentManager {
/**
- * Deploys the list of vertices on the given {@link AbstractInstance}.
+ * Deploys the list of vertices on the given {@link eu.stratosphere.nephele.instance.Instance}.
*
* @param jobID
* the ID of the job the vertices to be deployed belong to
@@ -37,5 +37,5 @@ public interface DeploymentManager {
* @param verticesToBeDeployed
* the list of vertices to be deployed
*/
- void deploy(JobID jobID, AbstractInstance instance, List<ExecutionVertex> verticesToBeDeployed);
+ void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
index a6f9cfe..37f9a43 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
@@ -38,7 +38,7 @@ import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
import eu.stratosphere.nephele.executiongraph.ManagementGraphFactory;
import eu.stratosphere.nephele.executiongraph.VertexAssignmentListener;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobgraph.JobStatus;
@@ -266,10 +266,10 @@ public final class EventCollector extends TimerTask implements ProfilingListener
final ManagementVertexID managementVertexID = id.toManagementVertexID();
final long timestamp = System.currentTimeMillis();
- final AbstractInstance instance = newAllocatedResource.getInstance();
+ final Instance instance = newAllocatedResource.getInstance();
VertexAssignmentEvent event;
if (instance == null) {
- event = new VertexAssignmentEvent(timestamp, managementVertexID, "null", "null");
+ event = new VertexAssignmentEvent(timestamp, managementVertexID, "null");
} else {
String instanceName = null;
@@ -279,8 +279,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
instanceName = instance.toString();
}
- event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName, instance.getType()
- .getIdentifier());
+ event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName);
}
this.eventCollector.updateManagementGraph(jobID, event);
@@ -609,7 +608,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
}
vertex.setInstanceName(vertexAssignmentEvent.getInstanceName());
- vertex.setInstanceType(vertexAssignmentEvent.getInstanceType());
}
}