You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:46:32 UTC
[07/53] [abbrv] Rework the Taskmanager to a slot based model and
remove legacy cloud code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 6e25796..8a3cba4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -16,13 +16,14 @@ package eu.stratosphere.nephele.jobmanager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -68,14 +70,11 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.GraphConversionException;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.HardwareDescription;
+import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.local.LocalInstanceManager;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
@@ -85,7 +84,7 @@ import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager;
import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
@@ -106,6 +105,7 @@ import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
import eu.stratosphere.runtime.io.network.RemoteReceiver;
import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.nephele.types.IntegerRecord;
import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -135,7 +135,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private final InputSplitManager inputSplitManager;
- private final AbstractScheduler scheduler;
+ private final DefaultScheduler scheduler;
private AccumulatorManager accumulatorManager;
@@ -213,20 +213,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LOG.info("Starting job manager in " + executionMode + " mode");
// Try to load the instance manager for the given execution mode
- // Try to load the scheduler for the given execution mode
- if (executionMode == ExecutionMode.LOCAL) {
- try {
- this.instanceManager = new LocalInstanceManager();
- } catch (Throwable t) {
- throw new Exception("Cannot instantiate local instance manager: " + t.getMessage(), t);
- }
- } else {
- final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
- LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
- this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
- if (this.instanceManager == null) {
- throw new Exception("Unable to load instance manager " + instanceManagerClassName);
- }
+ final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
+ LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
+ this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
+ if (this.instanceManager == null) {
+ throw new Exception("Unable to load instance manager " + instanceManagerClassName);
}
// Try to load the scheduler for the given execution mode
@@ -479,7 +470,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
ExecutionGraph eg;
try {
- eg = new ExecutionGraph(job, this.instanceManager);
+ eg = new ExecutionGraph(job, this.getAvailableSlots());
} catch (GraphConversionException e) {
if (e.getCause() == null) {
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
@@ -520,7 +511,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
try {
- this.scheduler.schedulJob(eg);
+ this.scheduler.scheduleJob(eg);
} catch (SchedulingException e) {
unregisterJob(eg);
JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
@@ -561,10 +552,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
}
- // Cancel all pending requests for instances
- this.instanceManager.cancelPendingRequests(executionGraph.getJobID()); // getJobID is final member, no
- // synchronization necessary
-
// Remove job from input split manager
if (this.inputSplitManager != null) {
this.inputSplitManager.unregisterJob(executionGraph);
@@ -582,8 +569,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
- public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo,
- final HardwareDescription hardwareDescription) {
+ public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) {
// Delegate call to instance manager
if (this.instanceManager != null) {
@@ -592,7 +578,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
public void run() {
- instanceManager.reportHeartBeat(instanceConnectionInfo, hardwareDescription);
+ instanceManager.reportHeartBeat(instanceConnectionInfo);
}
};
@@ -600,6 +586,25 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
}
+ @Override
+ public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
+ final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots){
+ if(this.instanceManager != null) {
+ final Runnable registerTaskManagerRunnable = new Runnable() {
+ @Override
+ public void run(){
+ instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription,
+ numberOfSlots.getValue());
+ }
+ };
+
+ this.executorService.execute(registerTaskManagerRunnable);
+ return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
+ }
+
+ return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE);
+ }
+
@Override
public void updateTaskExecutionState(final TaskExecutionState executionState) throws IOException {
@@ -730,9 +735,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (sourceChannelID.equals(edge.getInputChannelID())) {
// Request was sent from an input channel
+
final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
- final AbstractInstance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
+ final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
if (assignedInstance == null) {
LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
+ " but no instance assigned");
@@ -758,6 +764,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
} else {
// Receiver runs on a different task manager
+
final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
@@ -788,7 +795,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return ConnectionInfoLookupResponse.createReceiverNotReady();
}
- final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
+ final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
if (assignedInstance == null) {
LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
@@ -877,6 +884,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return eventList;
}
+
@Override
public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
@@ -909,10 +917,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
eg.executeCommand(runnable);
}
+
@Override
public void killInstance(final StringRecord instanceName) throws IOException {
- final AbstractInstance instance = this.instanceManager.getInstanceByName(instanceName.toString());
+ final Instance instance = this.instanceManager.getInstanceByName(instanceName.toString());
if (instance == null) {
LOG.error("Cannot find instance with name " + instanceName + " to kill it");
return;
@@ -947,16 +956,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
- public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-
- // Delegate call to the instance manager
- if (this.instanceManager != null) {
- return this.instanceManager.getMapOfAvailableInstanceTypes();
- }
-
- return null;
- }
-
@Override
public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
@@ -987,7 +986,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return;
}
- final Set<AbstractInstance> allocatedInstance = new HashSet<AbstractInstance>();
+ final Set<Instance> allocatedInstance = new HashSet<Instance>();
final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
while (it.hasNext()) {
@@ -995,7 +994,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
final ExecutionVertex vertex = it.next();
final ExecutionState state = vertex.getExecutionState();
if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHING) {
- final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+ final Instance instance = vertex.getAllocatedResource().getInstance();
if (instance instanceof DummyInstance) {
LOG.error("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state "
@@ -1013,7 +1012,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
public void run() {
- final Iterator<AbstractInstance> it2 = allocatedInstance.iterator();
+ final Iterator<Instance> it2 = allocatedInstance.iterator();
try {
while (it2.hasNext()) {
@@ -1030,9 +1029,14 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
this.executorService.execute(requestRunnable);
}
+ @Override
+ public int getAvailableSlots() {
+ return getInstanceManager().getNumberOfSlots();
+ }
+
@Override
- public void deploy(final JobID jobID, final AbstractInstance instance,
+ public void deploy(final JobID jobID, final Instance instance,
final List<ExecutionVertex> verticesToBeDeployed) {
if (verticesToBeDeployed.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
index 5b0b30d..45506aa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
@@ -20,12 +20,11 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import eu.stratosphere.nephele.ExecutionMode;
-
+import eu.stratosphere.nephele.instance.InstanceManager;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.util.StringUtils;
/**
@@ -47,7 +46,7 @@ public class JobManagerUtils {
/**
* Tries to locate a class with given name and to
- * instantiate a {@link AbstractScheduler} object from it.
+ * instantiate a {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} object from it.
*
* @param schedulerClassName
* the name of the class to instantiate the scheduler object from
@@ -55,21 +54,21 @@ public class JobManagerUtils {
* the deployment manager which shall be passed on to the scheduler
* @param instanceManager
* the instance manager which shall be passed on to the scheduler
- * @return the {@link AbstractScheduler} object instantiated from the class with the provided name
+ * @return the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} object instantiated from the class with the provided name
*/
@SuppressWarnings("unchecked")
- static AbstractScheduler loadScheduler(final String schedulerClassName, final DeploymentManager deploymentManager,
+ static DefaultScheduler loadScheduler(final String schedulerClassName, final DeploymentManager deploymentManager,
final InstanceManager instanceManager) {
- Class<? extends AbstractScheduler> schedulerClass;
+ Class<? extends DefaultScheduler> schedulerClass;
try {
- schedulerClass = (Class<? extends AbstractScheduler>) Class.forName(schedulerClassName);
+ schedulerClass = (Class<? extends DefaultScheduler>) Class.forName(schedulerClassName);
} catch (ClassNotFoundException e) {
LOG.error("Cannot find class " + schedulerClassName + ": " + StringUtils.stringifyException(e));
return null;
}
- Constructor<? extends AbstractScheduler> constructor;
+ Constructor<? extends DefaultScheduler> constructor;
try {
@@ -83,7 +82,7 @@ public class JobManagerUtils {
return null;
}
- AbstractScheduler scheduler;
+ DefaultScheduler scheduler;
try {
scheduler = constructor.newInstance(deploymentManager, instanceManager);
@@ -110,7 +109,7 @@ public class JobManagerUtils {
*
* @param instanceManagerClassName
* the name of the class to instantiate the instance manager object from
- * @return the {@link InstanceManager} object instantiated from the class with the provided name
+ * @return the {@link eu.stratosphere.nephele.instance.InstanceManager} object instantiated from the class with the provided name
*/
@SuppressWarnings("unchecked")
static InstanceManager loadInstanceManager(final String instanceManagerClassName) {
@@ -139,53 +138,34 @@ public class JobManagerUtils {
}
/**
- * Tries to read the class name of the {@link AbstractScheduler} implementation from the global configuration which
+ * Tries to read the class name of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation from the global configuration which
* is set to be used for the provided execution mode.
*
* @param executionMode The Nephele execution mode.
- * @return the class name of the {@link AbstractScheduler} implementation to be used or <code>null</code> if no
+ * @return the class name of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation to be used or <code>null</code> if no
* implementation is configured for the given execution mode
*/
static String getSchedulerClassName(ExecutionMode executionMode) {
- switch (executionMode) {
- case LOCAL:
- return "eu.stratosphere.nephele.jobmanager.scheduler.local.LocalScheduler";
- case CLUSTER:
- return "eu.stratosphere.nephele.jobmanager.scheduler.queue.QueueScheduler";
- default:
- throw new RuntimeException("Unrecognized Execution Mode.");
- }
-// String modeClass = getClassStringForMode(executionMode);
-// String instanceManagerClassNameKey = "jobmanager.scheduler." + modeClass + ".classname";
-// String schedulerClassName = GlobalConfiguration.getString(instanceManagerClassNameKey, null);
-//
-// if (executionMode == ExecutionMode.LOCAL && schedulerClassName == null) {
-// schedulerClassName = ConfigConstants.DEFAULT_LOCAL_MODE_SCHEDULER;
-// }
-// return schedulerClassName;
+ return "eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler";
}
/**
- * Tries to read the class name of the {@link InstanceManager} implementation from the global configuration which is
+ * Tries to read the class name of the {@link eu.stratosphere.nephele.instance.InstanceManager} implementation from the global configuration which is
* set to be used for the provided execution mode.
*
* @param executionMode The Nephele execution mode.
- * @return the class name of the {@link InstanceManager} implementation to be used or <code>null</code> if no
+ * @return the class name of the {@link eu.stratosphere.nephele.instance.InstanceManager} implementation to be used or <code>null</code> if no
* implementation is configured for the given execution mode
*/
static String getInstanceManagerClassName(ExecutionMode executionMode) {
switch (executionMode) {
case LOCAL:
- return "eu.stratosphere.nephele.instance.local.LocalInstanceManager";
+ return "eu.stratosphere.nephele.instance.LocalInstanceManager";
case CLUSTER:
- return "eu.stratosphere.nephele.instance.cluster.ClusterManager";
+ return "eu.stratosphere.nephele.instance.DefaultInstanceManager";
default:
throw new RuntimeException("Unrecognized Execution Mode.");
}
-//
-// final String modeClass = getClassStringForMode(executionMode);
-// final String instanceManagerClassNameKey = "jobmanager.instancemanager." + modeClass + ".classname";
-// return GlobalConfiguration.getString(instanceManagerClassNameKey, null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
deleted file mode 100644
index 5b528c7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import eu.stratosphere.nephele.execution.ExecutionListener;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public abstract class AbstractExecutionListener implements ExecutionListener {
-
- /**
- * The instance of the {@link LocalScheduler}.
- */
- private final AbstractScheduler scheduler;
-
- /**
- * The {@link ExecutionVertex} this wrapper object belongs to.
- */
- private final ExecutionVertex executionVertex;
-
- /**
- * Constructs a new wrapper object for the given {@link ExecutionVertex}.
- *
- * @param AbstractScheduler
- * the instance of the {@link AbstractScheduler}
- * @param executionVertex
- * the {@link ExecutionVertex} the received notification refer to
- */
- public AbstractExecutionListener(final AbstractScheduler scheduler, final ExecutionVertex executionVertex) {
- this.scheduler = scheduler;
- this.executionVertex = executionVertex;
- }
-
-
- @Override
- public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
- final ExecutionState newExecutionState, final String optionalMessage) {
-
- final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
-
- // Check if we can deploy a new pipeline.
- if (newExecutionState == ExecutionState.FINISHING) {
-
- final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
- if (!pipeline.isFinishing()) {
- // Some tasks of the pipeline are still running
- return;
- }
-
- // Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
- final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
- for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
- final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
- if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
-
- final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
- pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
- pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
-
- this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
- return;
- }
- }
- }
-
- if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
-
- synchronized (eg) {
-
- if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
-
- if (eg.getJobStatus() == InternalJobStatus.FAILING) {
- return;
- }
-
- this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
-
- // Run through the deployment procedure
- this.scheduler.deployAssignedVertices(this.executionVertex);
- return;
- }
- }
- }
-
- if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
- || newExecutionState == ExecutionState.FAILED) {
- // Check if instance can be released
- this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
- }
-
- // In case of an error, check if the vertex shall be recovered
- if (newExecutionState == ExecutionState.FAILED) {
- if (this.executionVertex.decrementRetriesLeftAndCheck()) {
-
- final Set<ExecutionVertex> assignedVertices = new HashSet<ExecutionVertex>();
-
- if (RecoveryLogic.recover(this.executionVertex, this.scheduler.getVerticesToBeRestarted(),
- assignedVertices)) {
-
- if (RecoveryLogic.hasInstanceAssigned(this.executionVertex)) {
- // Run through the deployment procedure
- this.scheduler.deployAssignedVertices(assignedVertices);
- }
-
- } else {
-
- // Make sure the map with the vertices to be restarted is cleaned up properly
- synchronized (eg) {
-
- final Iterator<ExecutionVertex> it = this.scheduler.getVerticesToBeRestarted().values()
- .iterator();
-
- while (it.hasNext()) {
- if (eg.equals(it.next().getExecutionGraph())) {
- it.remove();
- }
- }
- }
-
- // Actual cancellation of job is performed by job manager
- }
- }
- }
-
- }
-
-
- @Override
- public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
-
- @Override
- public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
-
- @Override
- public int getPriority() {
-
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
deleted file mode 100644
index 24e2970..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This abstract scheduler must be extended by a scheduler implementations for Nephele. The abstract class defines the
- * fundamental methods for scheduling and removing jobs. While Nephele's
- * {@link eu.stratosphere.nephele.jobmanager.JobManager} is responsible for requesting the required instances for the
- * job at the {@link eu.stratosphere.nephele.instance.InstanceManager}, the scheduler is in charge of assigning the
- * individual tasks to the instances.
- *
- */
-public abstract class AbstractScheduler implements InstanceListener {
-
- /**
- * The LOG object to report events within the scheduler.
- */
- protected static final Log LOG = LogFactory.getLog(AbstractScheduler.class);
-
- /**
- * The instance manager assigned to this scheduler.
- */
- private final InstanceManager instanceManager;
-
- /**
- * The deployment manager assigned to this scheduler.
- */
- private final DeploymentManager deploymentManager;
-
- /**
- * Stores the vertices to be restarted once they have switched to the <code>CANCELED</code> state.
- */
- private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
-
- /**
- * Constructs a new abstract scheduler.
- *
- * @param deploymentManager
- * the deployment manager assigned to this scheduler
- * @param instanceManager
- * the instance manager to be used with this scheduler
- */
- protected AbstractScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
-
- this.deploymentManager = deploymentManager;
- this.instanceManager = instanceManager;
- this.instanceManager.setInstanceListener(this);
- }
-
- /**
- * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
- * to the strategies of the concrete scheduler implementation.
- *
- * @param executionGraph
- * the job to be added to the scheduler
- * @throws SchedulingException
- * thrown if an error occurs and the scheduler does not accept the new job
- */
- public abstract void schedulJob(ExecutionGraph executionGraph) throws SchedulingException;
-
- /**
- * Returns the execution graph which is associated with the given job ID.
- *
- * @param jobID
- * the job ID to search the execution graph for
- * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
- * exists
- */
- public abstract ExecutionGraph getExecutionGraphByID(JobID jobID);
-
- /**
- * Returns the {@link InstanceManager} object which is used by the current scheduler.
- *
- * @return the {@link InstanceManager} object which is used by the current scheduler
- */
- public InstanceManager getInstanceManager() {
- return this.instanceManager;
- }
-
- // void removeJob(JobID jobID);
-
- /**
- * Shuts the scheduler down. After shut down no jobs can be added to the scheduler.
- */
- public abstract void shutdown();
-
- /**
- * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
- * loaded instance manager.
- *
- * @param executionStage
- * the execution stage to collect the required instances from
- * @throws InstanceException
- * thrown if the given execution graph is already processing its final stage
- */
- protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
-
- final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
- final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-
- synchronized (executionStage) {
-
- executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
- final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
- LOG.info("Requesting the following instances for job " + executionGraph.getJobID());
- while (it.hasNext()) {
- final Map.Entry<InstanceType, Integer> entry = it.next();
- LOG.info(" " + entry.getKey() + " [" + entry.getValue().intValue() + ", "
- + instanceRequestMap.getMaximumNumberOfInstances(entry.getKey()) + "]");
- }
-
- if (instanceRequestMap.isEmpty()) {
- return;
- }
-
- this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
- instanceRequestMap, null);
-
- // Switch vertex state to assigning
- final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
- .getIndexOfCurrentExecutionStage(), true, true);
- while (it2.hasNext()) {
-
- it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- }
- }
- }
-
- void findVerticesToBeDeployed(final ExecutionVertex vertex,
- final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed,
- final Set<ExecutionVertex> alreadyVisited) {
-
- if (!alreadyVisited.add(vertex)) {
- return;
- }
-
- if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
- final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
-
- if (instance instanceof DummyInstance) {
- LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
- }
-
- List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
- if (verticesForInstance == null) {
- verticesForInstance = new ArrayList<ExecutionVertex>();
- verticesToBeDeployed.put(instance, verticesForInstance);
- }
-
- verticesForInstance.add(vertex);
- }
-
- final int numberOfOutputGates = vertex.getNumberOfOutputGates();
- for (int i = 0; i < numberOfOutputGates; ++i) {
-
- final ExecutionGate outputGate = vertex.getOutputGate(i);
- boolean deployTarget;
-
- switch (outputGate.getChannelType()) {
- case NETWORK:
- deployTarget = false;
- break;
- case IN_MEMORY:
- deployTarget = true;
- break;
- default:
- throw new IllegalStateException("Unknown channel type");
- }
-
- if (deployTarget) {
-
- final int numberOfOutputChannels = outputGate.getNumberOfEdges();
- for (int j = 0; j < numberOfOutputChannels; ++j) {
- final ExecutionEdge outputChannel = outputGate.getEdge(j);
- final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
- findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
- }
- }
- }
- }
-
- /**
- * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
- * deploys them on the assigned {@link AllocatedResource} objects.
- *
- * @param startVertex
- * the execution vertex to start the deployment from
- */
- public void deployAssignedVertices(final ExecutionVertex startVertex) {
-
- final JobID jobID = startVertex.getExecutionGraph().getJobID();
-
- final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
- final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
- findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-
- if (!verticesToBeDeployed.isEmpty()) {
-
- final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
- .entrySet()
- .iterator();
-
- while (it2.hasNext()) {
-
- final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
- this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
- }
- }
- }
-
- /**
- * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
- * {@link AllocatedResource} objects.
- *
- * @param pipeline
- * the execution pipeline to be deployed
- */
- public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
-
- final JobID jobID = null;
-
- final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
- final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
- final Iterator<ExecutionVertex> it = pipeline.iterator();
- while (it.hasNext()) {
- findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
- }
-
- if (!verticesToBeDeployed.isEmpty()) {
-
- final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
- .entrySet()
- .iterator();
-
- while (it2.hasNext()) {
-
- final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
- this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
- }
- }
- }
-
- /**
- * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
- * deploys them on the assigned {@link AllocatedResource} objects.
- *
- * @param startVertices
- * the collection of execution vertices to start the deployment from
- */
- public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
-
- JobID jobID = null;
-
- final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
- final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
- for (final ExecutionVertex startVertex : startVertices) {
-
- if (jobID == null) {
- jobID = startVertex.getExecutionGraph().getJobID();
- }
-
- findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
- }
-
- if (!verticesToBeDeployed.isEmpty()) {
-
- final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
- .entrySet()
- .iterator();
-
- while (it2.hasNext()) {
-
- final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
- this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
- }
- }
- }
-
- /**
- * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
- * stage and deploys them on the assigned {@link AllocatedResource} objects.
- *
- * @param executionGraph
- * the execution graph to collect the vertices from
- */
- public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
-
- final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
- final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-
- final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
- for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
-
- final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
- if (!startVertex.isInputVertex()) {
- continue;
- }
-
- for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
- final ExecutionVertex vertex = startVertex.getGroupMember(j);
- findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
- }
- }
-
- if (!verticesToBeDeployed.isEmpty()) {
-
- final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
- .entrySet()
- .iterator();
-
- while (it2.hasNext()) {
-
- final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
- this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
- }
- }
- }
-
-
- @Override
- public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
- if (allocatedResources == null) {
- LOG.error("Resource to lock is null!");
- return;
- }
-
- for (final AllocatedResource allocatedResource : allocatedResources) {
- if (allocatedResource.getInstance() instanceof DummyInstance) {
- LOG.debug("Available instance is of type DummyInstance!");
- return;
- }
- }
-
- final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
- if (eg == null) {
- /*
- * The job have have been canceled in the meantime, in this case
- * we release the instance immediately.
- */
- try {
- for (final AllocatedResource allocatedResource : allocatedResources) {
- getInstanceManager().releaseAllocatedResource(jobID, null, allocatedResource);
- }
- } catch (InstanceException e) {
- LOG.error(e);
- }
- return;
- }
-
- final Runnable command = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
-
- final ExecutionStage stage = eg.getCurrentExecutionStage();
-
- synchronized (stage) {
-
- for (final AllocatedResource allocatedResource : allocatedResources) {
-
- AllocatedResource resourceToBeReplaced = null;
- // Important: only look for instances to be replaced in the current stage
- final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
- stage.getStageNumber());
- while (groupIterator.hasNext()) {
-
- final ExecutionGroupVertex groupVertex = groupIterator.next();
- for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-
- final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-
- if (vertex.getExecutionState() == ExecutionState.SCHEDULED
- && vertex.getAllocatedResource() != null) {
- // In local mode, we do not consider any topology, only the instance type
- if (vertex.getAllocatedResource().getInstanceType().equals(
- allocatedResource.getInstanceType())) {
- resourceToBeReplaced = vertex.getAllocatedResource();
- break;
- }
- }
- }
-
- if (resourceToBeReplaced != null) {
- break;
- }
- }
-
- // For some reason, we don't need this instance
- if (resourceToBeReplaced == null) {
- LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
- + eg.getJobID());
- try {
- getInstanceManager().releaseAllocatedResource(jobID, eg.getJobConfiguration(),
- allocatedResource);
- } catch (InstanceException e) {
- LOG.error(e);
- }
- return;
- }
-
- // Replace the selected instance
- final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
- while (it.hasNext()) {
- final ExecutionVertex vertex = it.next();
- vertex.setAllocatedResource(allocatedResource);
- vertex.updateExecutionState(ExecutionState.ASSIGNED);
- }
- }
- }
-
- // Deploy the assigned vertices
- deployAssignedInputVertices(eg);
-
- }
-
- };
-
- eg.executeCommand(command);
- }
-
- /**
- * Checks if the given {@link AllocatedResource} is still required for the
- * execution of the given execution graph. If the resource is no longer
- * assigned to a vertex that is either currently running or about to run
- * the given resource is returned to the instance manager for deallocation.
- *
- * @param executionGraph
- * the execution graph the provided resource has been used for so far
- * @param allocatedResource
- * the allocated resource to check the assignment for
- */
- public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
- final AllocatedResource allocatedResource) {
-
- if (allocatedResource == null) {
- LOG.error("Resource to lock is null!");
- return;
- }
-
- if (allocatedResource.getInstance() instanceof DummyInstance) {
- LOG.debug("Available instance is of type DummyInstance!");
- return;
- }
-
- boolean resourceCanBeReleased = true;
- final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
- while (it.hasNext()) {
- final ExecutionVertex vertex = it.next();
- final ExecutionState state = vertex.getExecutionState();
-
- if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
- && state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-
- resourceCanBeReleased = false;
- break;
- }
- }
-
- if (resourceCanBeReleased) {
-
- LOG.info("Releasing instance " + allocatedResource.getInstance());
- try {
- getInstanceManager().releaseAllocatedResource(executionGraph.getJobID(), executionGraph
- .getJobConfiguration(), allocatedResource);
- } catch (InstanceException e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
- }
-
- DeploymentManager getDeploymentManager() {
- return this.deploymentManager;
- }
-
- protected void replayCheckpointsFromPreviousStage(final ExecutionGraph executionGraph) {
-
- final int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
- final ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
-
- final List<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
-
- for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
-
- final ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
- vertex.updateExecutionState(ExecutionState.ASSIGNED);
- verticesToBeReplayed.add(vertex);
- }
-
- deployAssignedVertices(verticesToBeReplayed);
- }
-
- /**
- * Returns a map of vertices to be restarted once they have switched to their <code>CANCELED</code> state.
- *
- * @return the map of vertices to be restarted
- */
- Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
-
- return this.verticesToBeRestarted;
- }
-
-
- @Override
- public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
- final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
- if (eg == null) {
- LOG.error("Cannot find execution graph for job with ID " + jobID);
- return;
- }
-
- final Runnable command = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
-
- synchronized (eg) {
-
- for (final AllocatedResource allocatedResource : allocatedResources) {
-
- LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
- + " died.");
-
- final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
-
- if (executionGraph == null) {
- LOG.error("Cannot find execution graph for job " + jobID);
- return;
- }
-
- Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
-
- // Assign vertices back to a dummy resource.
- final DummyInstance dummyInstance = DummyInstance.createDummyInstance(allocatedResource
- .getInstance()
- .getType());
- final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
- allocatedResource.getInstanceType(), new AllocationID());
-
- while (vertexIter.hasNext()) {
- final ExecutionVertex vertex = vertexIter.next();
- vertex.setAllocatedResource(dummyResource);
- }
-
- final String failureMessage = allocatedResource.getInstance().getName() + " died";
-
- vertexIter = allocatedResource.assignedVertices();
-
- while (vertexIter.hasNext()) {
- final ExecutionVertex vertex = vertexIter.next();
- final ExecutionState state = vertex.getExecutionState();
-
- switch (state) {
- case ASSIGNED:
- case READY:
- case STARTING:
- case RUNNING:
- case FINISHING:
-
- vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
-
- break;
- default:
- }
- }
-
- // TODO: Fix this
- /*
- * try {
- * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
- * } catch (InstanceException e) {
- * e.printStackTrace();
- * // TODO: Cancel the entire job in this case
- * }
- */
- }
- }
-
- final InternalJobStatus js = eg.getJobStatus();
- if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
-
- // TODO: Fix this
- // deployAssignedVertices(eg);
-
- final ExecutionStage stage = eg.getCurrentExecutionStage();
-
- try {
- requestInstances(stage);
- } catch (InstanceException e) {
- e.printStackTrace();
- // TODO: Cancel the entire job in this case
- }
- }
- }
- };
-
- eg.executeCommand(command);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
new file mode 100644
index 0000000..86b3c40
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
@@ -0,0 +1,127 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager.scheduler;
+
+import eu.stratosphere.nephele.execution.ExecutionListener;
+import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+public class DefaultExecutionListener implements ExecutionListener {
+
+ /**
+ * The instance of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler}.
+ */
+ private final DefaultScheduler scheduler;
+
+ /**
+ * The {@link ExecutionVertex} this wrapper object belongs to.
+ */
+ private final ExecutionVertex executionVertex;
+
+ /**
+ * Constructs a new wrapper object for the given {@link ExecutionVertex}.
+ *
+ * @param scheduler
+ * the instance of the {@link DefaultScheduler}
+ * @param executionVertex
+ * the {@link ExecutionVertex} the received notification refer to
+ */
+ public DefaultExecutionListener(final DefaultScheduler scheduler, final ExecutionVertex executionVertex) {
+ this.scheduler = scheduler;
+ this.executionVertex = executionVertex;
+ }
+
+
+ @Override
+ public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
+ final ExecutionState newExecutionState, final String optionalMessage) {
+
+ final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
+
+ // Check if we can deploy a new pipeline.
+ if (newExecutionState == ExecutionState.FINISHING) {
+
+ final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
+ if (!pipeline.isFinishing()) {
+ // Some tasks of the pipeline are still running
+ return;
+ }
+
+ // Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
+ final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
+ for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
+ final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
+ if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
+
+ final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
+ pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
+ pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
+
+ this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
+ return;
+ }
+ }
+ }
+
+ if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
+
+ synchronized (eg) {
+
+ if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
+
+ if (eg.getJobStatus() == InternalJobStatus.FAILING) {
+ return;
+ }
+
+ this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
+
+ // Run through the deployment procedure
+ this.scheduler.deployAssignedVertices(this.executionVertex);
+ return;
+ }
+ }
+ }
+
+ if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
+ || newExecutionState == ExecutionState.FAILED) {
+ // Check if instance can be released
+ this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
+ }
+ }
+
+
+ @Override
+ public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
+ // Nothing to do here
+ }
+
+
+ @Override
+ public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
+ // Nothing to do here
+ }
+
+
+ @Override
+ public int getPriority() {
+
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
new file mode 100644
index 0000000..745b199
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
@@ -0,0 +1,762 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Deque;
+import java.util.ArrayDeque;
+
+import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
+import eu.stratosphere.nephele.executiongraph.ExecutionGate;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
+import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
+import eu.stratosphere.nephele.executiongraph.ExecutionStage;
+import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
+import eu.stratosphere.nephele.executiongraph.JobStatusListener;
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.AllocationID;
+import eu.stratosphere.nephele.instance.DummyInstance;
+import eu.stratosphere.nephele.instance.InstanceException;
+import eu.stratosphere.nephele.instance.InstanceListener;
+import eu.stratosphere.nephele.instance.InstanceManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.instance.Instance;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.jobmanager.DeploymentManager;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * The default scheduler for Nephele. While Nephele's
+ * {@link eu.stratosphere.nephele.jobmanager.JobManager} is responsible for requesting the required instances for the
+ * job at the {@link eu.stratosphere.nephele.instance.InstanceManager}, the scheduler is in charge of assigning the
+ * individual tasks to the instances.
+ *
+ */
+public class DefaultScheduler implements InstanceListener, JobStatusListener, ExecutionStageListener {
+
+ /**
+ * The LOG object to report events within the scheduler.
+ */
+ protected static final Log LOG = LogFactory.getLog(DefaultScheduler.class);
+
+ /**
+ * The instance manager assigned to this scheduler.
+ */
+ private final InstanceManager instanceManager;
+
+ /**
+ * The deployment manager assigned to this scheduler.
+ */
+ private final DeploymentManager deploymentManager;
+
+ /**
+ * Stores the vertices to be restarted once they have switched to the <code>CANCELED</code> state.
+ */
+ private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
+
+ /**
+ * The job queue where all submitted jobs go to.
+ */
+ private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
+
+ /**
+ * Constructs a new abstract scheduler.
+ *
+ * @param deploymentManager
+ * the deployment manager assigned to this scheduler
+ * @param instanceManager
+ * the instance manager to be used with this scheduler
+ */
+ public DefaultScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
+
+ this.deploymentManager = deploymentManager;
+ this.instanceManager = instanceManager;
+ this.instanceManager.setInstanceListener(this);
+ }
+
+ /**
+ * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
+ *
+ * @param executionGraphToRemove
+ * the job to be removed
+ */
+ void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
+
+ boolean removedFromQueue = false;
+
+ synchronized (this.jobQueue) {
+
+ final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+ while (it.hasNext()) {
+
+ final ExecutionGraph executionGraph = it.next();
+ if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
+ removedFromQueue = true;
+ it.remove();
+ break;
+ }
+ }
+ }
+
+ if (!removedFromQueue) {
+ LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
+ + executionGraphToRemove.getJobID() + ") to remove");
+ }
+ }
+
+ /**
+ * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
+ * to the strategies of the concrete scheduler implementation.
+ *
+ * @param executionGraph
+ * the job to be added to the scheduler
+ * @throws SchedulingException
+ * thrown if an error occurs and the scheduler does not accept the new job
+ */
+ public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
+
+ final int requiredSlots = executionGraph.getRequiredSlots();
+ final int availableSlots = this.getInstanceManager().getNumberOfSlots();
+
+ if(requiredSlots > availableSlots){
+ throw new SchedulingException("Not enough slots to schedule job " + executionGraph.getJobID());
+ }
+
+ // Subscribe to job status notifications
+ executionGraph.registerJobStatusListener(this);
+
+ // Register execution listener for each vertex
+ final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
+ while (it2.hasNext()) {
+
+ final ExecutionVertex vertex = it2.next();
+ vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
+ }
+
+ // Register the scheduler as an execution stage listener
+ executionGraph.registerExecutionStageListener(this);
+
+ // Add job to the job queue (important to add job to queue before requesting instances)
+ synchronized (this.jobQueue) {
+ this.jobQueue.add(executionGraph);
+ }
+
+ // Request resources for the first stage of the job
+
+ final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+ try {
+ requestInstances(executionStage);
+ } catch (InstanceException e) {
+ final String exceptionMessage = StringUtils.stringifyException(e);
+ LOG.error(exceptionMessage);
+ this.jobQueue.remove(executionGraph);
+ throw new SchedulingException(exceptionMessage);
+ }
+ }
+
+ /**
+ * Returns the execution graph which is associated with the given job ID.
+ *
+ * @param jobID
+ * the job ID to search the execution graph for
+ * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
+ * exists
+ */
+ public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
+
+ synchronized (this.jobQueue) {
+
+ final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+ while (it.hasNext()) {
+
+ final ExecutionGraph executionGraph = it.next();
+ if (executionGraph.getJobID().equals(jobID)) {
+ return executionGraph;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Shuts the scheduler down. After shut down no jobs can be added to the scheduler.
+ */
+ public void shutdown() {
+
+ synchronized (this.jobQueue) {
+ this.jobQueue.clear();
+ }
+
+ }
+
+ public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
+ final String optionalMessage) {
+
+ if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
+ || newJobStatus == InternalJobStatus.CANCELED) {
+ removeJobFromSchedule(executionGraph);
+ }
+ }
+
+ public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
+
+ // Request new instances if necessary
+ try {
+ requestInstances(executionStage);
+ } catch (InstanceException e) {
+ // TODO: Handle error correctly
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ // Deploy the assigned vertices
+ deployAssignedInputVertices(executionStage.getExecutionGraph());
+ }
+
+
+ /**
+ * Returns the {@link eu.stratosphere.nephele.instance.InstanceManager} object which is used by the current scheduler.
+ *
+ * @return the {@link eu.stratosphere.nephele.instance.InstanceManager} object which is used by the current scheduler
+ */
+ public InstanceManager getInstanceManager() {
+ return this.instanceManager;
+ }
+
+
+ /**
+ * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
+ * loaded instance manager.
+ *
+ * @param executionStage
+ * the execution stage to collect the required instances from
+ * @throws InstanceException
+ * thrown if the given execution graph is already processing its final stage
+ */
+ protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
+
+ final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
+
+ synchronized (executionStage) {
+
+ final int requiredSlots = executionStage.getRequiredSlots();
+
+ LOG.info("Requesting " + requiredSlots + " for job " + executionGraph.getJobID());
+
+ this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
+ requiredSlots);
+
+ // Switch vertex state to assigning
+ final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
+ .getIndexOfCurrentExecutionStage(), true, true);
+ while (it2.hasNext()) {
+
+ it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+ }
+ }
+ }
+
+ void findVerticesToBeDeployed(final ExecutionVertex vertex,
+ final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
+ final Set<ExecutionVertex> alreadyVisited) {
+
+ if (!alreadyVisited.add(vertex)) {
+ return;
+ }
+
+ if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
+ final Instance instance = vertex.getAllocatedResource().getInstance();
+
+ if (instance instanceof DummyInstance) {
+ LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
+ }
+
+ List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
+ if (verticesForInstance == null) {
+ verticesForInstance = new ArrayList<ExecutionVertex>();
+ verticesToBeDeployed.put(instance, verticesForInstance);
+ }
+
+ verticesForInstance.add(vertex);
+ }
+
+ final int numberOfOutputGates = vertex.getNumberOfOutputGates();
+ for (int i = 0; i < numberOfOutputGates; ++i) {
+
+ final ExecutionGate outputGate = vertex.getOutputGate(i);
+ boolean deployTarget;
+
+ switch (outputGate.getChannelType()) {
+ case NETWORK:
+ deployTarget = false;
+ break;
+ case IN_MEMORY:
+ deployTarget = true;
+ break;
+ default:
+ throw new IllegalStateException("Unknown channel type");
+ }
+
+ if (deployTarget) {
+
+ final int numberOfOutputChannels = outputGate.getNumberOfEdges();
+ for (int j = 0; j < numberOfOutputChannels; ++j) {
+ final ExecutionEdge outputChannel = outputGate.getEdge(j);
+ final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
+ findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
+ }
+ }
+ }
+ }
+
+ /**
+ * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
+ * deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+ *
+ * @param startVertex
+ * the execution vertex to start the deployment from
+ */
+ public void deployAssignedVertices(final ExecutionVertex startVertex) {
+
+ final JobID jobID = startVertex.getExecutionGraph().getJobID();
+
+ final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+ final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+ findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+
+ if (!verticesToBeDeployed.isEmpty()) {
+
+ final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+ .entrySet()
+ .iterator();
+
+ while (it2.hasNext()) {
+
+ final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+ this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
+ * {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+ *
+ * @param pipeline
+ * the execution pipeline to be deployed
+ */
+ public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
+
+ final JobID jobID = null;
+
+ final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+ final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+ final Iterator<ExecutionVertex> it = pipeline.iterator();
+ while (it.hasNext()) {
+ findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
+ }
+
+ if (!verticesToBeDeployed.isEmpty()) {
+
+ final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+ .entrySet()
+ .iterator();
+
+ while (it2.hasNext()) {
+
+ final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+ this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
+ * deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+ *
+ * @param startVertices
+ * the collection of execution vertices to start the deployment from
+ */
+ public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
+
+ JobID jobID = null;
+
+ final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+ final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+ for (final ExecutionVertex startVertex : startVertices) {
+
+ if (jobID == null) {
+ jobID = startVertex.getExecutionGraph().getJobID();
+ }
+
+ findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+ }
+
+ if (!verticesToBeDeployed.isEmpty()) {
+
+ final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+ .entrySet()
+ .iterator();
+
+ while (it2.hasNext()) {
+
+ final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+ this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
+ * stage and deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+ *
+ * @param executionGraph
+ * the execution graph to collect the vertices from
+ */
+ public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
+
+ final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+ final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+
+ final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+ for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
+
+ final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
+ if (!startVertex.isInputVertex()) {
+ continue;
+ }
+
+ for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
+ final ExecutionVertex vertex = startVertex.getGroupMember(j);
+ findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
+ }
+ }
+
+ if (!verticesToBeDeployed.isEmpty()) {
+
+ final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+ .entrySet()
+ .iterator();
+
+ while (it2.hasNext()) {
+
+ final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+ this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+
+ @Override
+ public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+
+ if (allocatedResources == null) {
+ LOG.error("Resource to lock is null!");
+ return;
+ }
+
+ for (final AllocatedResource allocatedResource : allocatedResources) {
+ if (allocatedResource.getInstance() instanceof DummyInstance) {
+ LOG.debug("Available instance is of type DummyInstance!");
+ return;
+ }
+ }
+
+ final ExecutionGraph eg = getExecutionGraphByID(jobID);
+
+ if (eg == null) {
+ /*
+ * The job have have been canceled in the meantime, in this case
+ * we release the instance immediately.
+ */
+ try {
+ for (final AllocatedResource allocatedResource : allocatedResources) {
+ getInstanceManager().releaseAllocatedResource(allocatedResource);
+ }
+ } catch (InstanceException e) {
+ LOG.error(e);
+ }
+ return;
+ }
+
+ final Runnable command = new Runnable() {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+
+ final ExecutionStage stage = eg.getCurrentExecutionStage();
+
+ synchronized (stage) {
+
+ for (final AllocatedResource allocatedResource : allocatedResources) {
+
+ AllocatedResource resourceToBeReplaced = null;
+ // Important: only look for instances to be replaced in the current stage
+ final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
+ stage.getStageNumber());
+ while (groupIterator.hasNext()) {
+
+ final ExecutionGroupVertex groupVertex = groupIterator.next();
+ for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
+
+ final ExecutionVertex vertex = groupVertex.getGroupMember(i);
+
+ if (vertex.getExecutionState() == ExecutionState.SCHEDULED
+ && vertex.getAllocatedResource() != null) {
+ resourceToBeReplaced = vertex.getAllocatedResource();
+ break;
+ }
+ }
+
+ if (resourceToBeReplaced != null) {
+ break;
+ }
+ }
+
+ // For some reason, we don't need this instance
+ if (resourceToBeReplaced == null) {
+ LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
+ + eg.getJobID());
+ try {
+ getInstanceManager().releaseAllocatedResource(allocatedResource);
+ } catch (InstanceException e) {
+ LOG.error(e);
+ }
+ return;
+ }
+
+ // Replace the selected instance
+ final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
+ while (it.hasNext()) {
+ final ExecutionVertex vertex = it.next();
+ vertex.setAllocatedResource(allocatedResource);
+ vertex.updateExecutionState(ExecutionState.ASSIGNED);
+ }
+ }
+ }
+
+ // Deploy the assigned vertices
+ deployAssignedInputVertices(eg);
+
+ }
+
+ };
+
+ eg.executeCommand(command);
+ }
+
+ /**
+ * Checks if the given {@link AllocatedResource} is still required for the
+ * execution of the given execution graph. If the resource is no longer
+ * assigned to a vertex that is either currently running or about to run
+ * the given resource is returned to the instance manager for deallocation.
+ *
+ * @param executionGraph
+ * the execution graph the provided resource has been used for so far
+ * @param allocatedResource
+ * the allocated resource to check the assignment for
+ */
+ public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
+ final AllocatedResource allocatedResource) {
+
+ if (allocatedResource == null) {
+ LOG.error("Resource to lock is null!");
+ return;
+ }
+
+ if (allocatedResource.getInstance() instanceof DummyInstance) {
+ LOG.debug("Available instance is of type DummyInstance!");
+ return;
+ }
+
+ boolean resourceCanBeReleased = true;
+ final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
+ while (it.hasNext()) {
+ final ExecutionVertex vertex = it.next();
+ final ExecutionState state = vertex.getExecutionState();
+
+ if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
+ && state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
+
+ resourceCanBeReleased = false;
+ break;
+ }
+ }
+
+ if (resourceCanBeReleased) {
+
+ LOG.info("Releasing instance " + allocatedResource.getInstance());
+ try {
+ getInstanceManager().releaseAllocatedResource(allocatedResource);
+ } catch (InstanceException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ DeploymentManager getDeploymentManager() {
+ return this.deploymentManager;
+ }
+
+ protected void replayCheckpointsFromPreviousStage(final ExecutionGraph executionGraph) {
+
+ final int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
+ final ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
+
+ final List<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
+
+ for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
+
+ final ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
+ vertex.updateExecutionState(ExecutionState.ASSIGNED);
+ verticesToBeReplayed.add(vertex);
+ }
+
+ deployAssignedVertices(verticesToBeReplayed);
+ }
+
+ /**
+ * Returns a map of vertices to be restarted once they have switched to their <code>CANCELED</code> state.
+ *
+ * @return the map of vertices to be restarted
+ */
+ Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
+
+ return this.verticesToBeRestarted;
+ }
+
+
+ @Override
+ public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+
+ final ExecutionGraph eg = getExecutionGraphByID(jobID);
+
+ if (eg == null) {
+ LOG.error("Cannot find execution graph for job with ID " + jobID);
+ return;
+ }
+
+ final Runnable command = new Runnable() {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+
+ synchronized (eg) {
+
+ for (final AllocatedResource allocatedResource : allocatedResources) {
+
+ LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
+ + " died.");
+
+ final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
+
+ if (executionGraph == null) {
+ LOG.error("Cannot find execution graph for job " + jobID);
+ return;
+ }
+
+ Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
+
+ // Assign vertices back to a dummy resource.
+ final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
+ final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
+ new AllocationID());
+
+ while (vertexIter.hasNext()) {
+ final ExecutionVertex vertex = vertexIter.next();
+ vertex.setAllocatedResource(dummyResource);
+ }
+
+ final String failureMessage = allocatedResource.getInstance().getName() + " died";
+
+ vertexIter = allocatedResource.assignedVertices();
+
+ while (vertexIter.hasNext()) {
+ final ExecutionVertex vertex = vertexIter.next();
+ final ExecutionState state = vertex.getExecutionState();
+
+ switch (state) {
+ case ASSIGNED:
+ case READY:
+ case STARTING:
+ case RUNNING:
+ case FINISHING:
+
+ vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
+
+ break;
+ default:
+ }
+ }
+
+ // TODO: Fix this
+ /*
+ * try {
+ * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
+ * } catch (InstanceException e) {
+ * e.printStackTrace();
+ * // TODO: Cancel the entire job in this case
+ * }
+ */
+ }
+ }
+
+ final InternalJobStatus js = eg.getJobStatus();
+ if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
+
+ // TODO: Fix this
+ // deployAssignedVertices(eg);
+
+ final ExecutionStage stage = eg.getCurrentExecutionStage();
+
+ try {
+ requestInstances(stage);
+ } catch (InstanceException e) {
+ e.printStackTrace();
+ // TODO: Cancel the entire job in this case
+ }
+ }
+ }
+ };
+
+ eg.executeCommand(command);
+ }
+}