You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2012/08/05 13:07:49 UTC
svn commit: r1369551 [1/3] - in /hama/branches/HAMA-505-branch: conf/
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/ft/
core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...
Author: surajsmenon
Date: Sun Aug 5 11:07:48 2012
New Revision: 1369551
URL: http://svn.apache.org/viewvc?rev=1369551&view=rev
Log:
HAMA-557 HAMA-610 HAMA-611 HAMA-587 fixes committing to HAMA-505-branch
Added:
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
Modified:
hama/branches/HAMA-505-branch/conf/hama-default.xml
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
Modified: hama/branches/HAMA-505-branch/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/conf/hama-default.xml?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/conf/hama-default.xml (original)
+++ hama/branches/HAMA-505-branch/conf/hama-default.xml Sun Aug 5 11:07:48 2012
@@ -120,7 +120,12 @@
<description>The maximum number of BSP tasks that will be run simultaneously
by a groom server.</description>
</property>
- <property>
+ <property>
+ <name>bsp.ft.enabled</name>
+ <value>false</value>
+ <description>Enable Fault Tolerance in BSP Task execution.</description>
+ </property>
+ <property>
<name>bsp.checkpoint.enabled</name>
<value>false</value>
<description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java Sun Aug 5 11:07:48 2012
@@ -60,6 +60,24 @@ public interface Constants {
public static final String UTF8_ENCODING = "UTF-8";
public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
+
+ public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
+
+ public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+
+ ////////////////////////////////////////
+ // Task scheduler related constants
+ // //////////////////////////////////////
+
+ public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
+
+ // //////////////////////////////////////
+ // Fault tolerance related constants
+ // //////////////////////////////////////
+
+ public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
+
+ public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
// //////////////////////////////////////
// Checkpointing related constants
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sun Aug 5 11:07:48 2012
@@ -1025,7 +1025,7 @@ public class BSPJobClient extends Config
return tokens;
}
- static class RawSplit implements Writable {
+ public static class RawSplit implements Writable {
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Sun Aug 5 11:07:48 2012
@@ -28,12 +28,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,7 +48,6 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.sync.BSPMasterSyncClient;
import org.apache.hama.bsp.sync.MasterSyncClient;
import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.http.HttpServer;
@@ -59,18 +58,23 @@ import org.apache.hama.ipc.MasterProtoco
import org.apache.hama.monitor.fd.FDProvider;
import org.apache.hama.monitor.fd.Supervisor;
import org.apache.hama.monitor.fd.UDPSupervisor;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
/**
* BSPMaster is responsible to control all the groom servers and to manage bsp
- * jobs.
+ * jobs. It has the following responsibilities:
+ * <ol>
+ * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * requests and assigning the job to scheduler for scheduling BSP Tasks defined
+ * for the job.
+ * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
+ * servers in the cluster. It is responsible for adding new grooms to the
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if
+ * it get fails the availability requirement.
+ * <li> BSPMaster keeps track of all the task status for each job and handles
+ * the failure of job as requested by the jobs.
+ * </ol>
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
GroomServerManager, Watcher, MonitorManager {
@@ -80,8 +84,6 @@ public class BSPMaster implements JobSub
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private HamaConfiguration conf;
- ZooKeeper zk = null;
- private String bspRoot = null;
MasterSyncClient syncClient = null;
/**
@@ -126,7 +128,7 @@ public class BSPMaster implements JobSub
// Jobs' Meta Data
private Integer nextJobId = Integer.valueOf(1);
- // clients
+ private int totalSubmissions = 0; // how many jobs has been submitted by clients
private int totalTasks = 0; // currnetly running tasks
private int totalTaskCapacity; // max tasks that groom server can run
@@ -145,6 +147,13 @@ public class BSPMaster implements JobSub
private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
+ /**
+ * ReportGroomStatusHandler keeps track of the status reported by each
+ * Groomservers on the task they are executing currently. Based on the
+ * status reported, it is responsible for issuing task recovery requests,
+ * updating the job progress and other book keeping on currently running
+ * jobs.
+ */
private class ReportGroomStatusHandler implements DirectiveHandler {
@Override
@@ -181,8 +190,13 @@ public class BSPMaster implements JobSub
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (ts.getRunState() == TaskStatus.State.FAILED) {
- jip.status.setRunState(JobStatus.FAILED);
- jip.failedTask(tip, ts);
+ if(jip.handleFailure(tip)){
+ recoverTask(jip);
+ }
+ else {
+ jip.status.setRunState(JobStatus.FAILED);
+ jip.failedTask(tip, ts);
+ }
}
if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
for (JobInProgressListener listener : jobInProgressListeners) {
@@ -196,6 +210,7 @@ public class BSPMaster implements JobSub
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+
GroomProtocol worker = findGroomServer(tmpStatus);
Directive d1 = new DispatchTasksDirective(
new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -448,11 +463,26 @@ public class BSPMaster implements JobSub
return conf.getLocalPath("bsp.local.dir", pathString);
}
+ /**
+ * Starts the BSP Master process.
+ * @param conf The Hama configuration.
+ * @return an instance of BSPMaster
+ * @throws IOException
+ * @throws InterruptedException
+ */
public static BSPMaster startMaster(HamaConfiguration conf)
throws IOException, InterruptedException {
return startMaster(conf, generateNewIdentifier());
}
+ /**
+ * Starts the BSP Master process
+ * @param conf The Hama configuration
+ * @param identifier Identifier for the job.
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
public static BSPMaster startMaster(HamaConfiguration conf, String identifier)
throws IOException, InterruptedException {
BSPMaster result = new BSPMaster(conf, identifier);
@@ -573,6 +603,11 @@ public class BSPMaster implements JobSub
JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
this.conf);
+ ++totalSubmissions;
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Submitting job number = " + totalSubmissions +
+ " id = " + job.getJobID());
+ }
return addJob(jobID, job);
}
@@ -720,6 +755,21 @@ public class BSPMaster implements JobSub
}
return job.getStatus();
}
+
+ /**
+ * Recovers task in job. To be called when a particular task in a job has failed
+ * and there is a need to schedule it on a machine.
+ */
+ private synchronized void recoverTask(JobInProgress job) {
+ ++totalSubmissions;
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.recoverTaskInJob(job);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter Scheduler a job is added.", ioe);
+ }
+ }
+ }
@Override
public JobStatus[] jobsToComplete() throws IOException {
@@ -836,11 +886,14 @@ public class BSPMaster implements JobSub
}
}
+ /**
+ * Shuts down the BSP Process and does the necessary clean up.
+ */
public void shutdown() {
try {
- this.zk.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ this.syncClient.close();
+ } catch (IOException e) {
+ LOG.error("Error closing the sync client",e);
}
if (null != this.supervisor.get()) {
this.supervisor.get().stop();
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sun Aug 5 11:07:48 2012
@@ -19,30 +19,27 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.message.MessageQueue;
-import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
-import org.apache.hama.bsp.sync.SyncClient;
-import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
@@ -57,10 +54,7 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
- IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
- TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
- COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
@@ -78,10 +72,6 @@ public final class BSPPeerImpl<K1, V1, K
private PeerSyncClient syncClient;
private MessageManager<M> messenger;
- // A checkpoint is initiated at the <checkPointInterval>th interval.
- private int checkPointInterval;
- private long lastCheckPointStep;
-
// IO
private int partition;
private String splitClass;
@@ -96,6 +86,8 @@ public final class BSPPeerImpl<K1, V1, K
private Counters counters;
private Combiner<M> combiner;
+ private FaultTolerantPeerService<M> faultToleranceService;
+
/**
* Protected default constructor for LocalBSPRunner.
*/
@@ -127,6 +119,13 @@ public final class BSPPeerImpl<K1, V1, K
this.counters = counters;
}
+ public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
+ BSPPeerProtocol umbilical, int partition, String splitClass,
+ BytesWritable split, Counters counters) throws Exception {
+ this(job, conf, taskId, umbilical, partition, splitClass, split, counters,
+ -1, TaskStatus.State.RUNNING);
+ }
+
/**
* BSPPeer Constructor.
*
@@ -140,7 +139,8 @@ public final class BSPPeerImpl<K1, V1, K
@SuppressWarnings("unchecked")
public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
BSPPeerProtocol umbilical, int partition, String splitClass,
- BytesWritable split, Counters counters) throws Exception {
+ BytesWritable split, Counters counters, long superstep,
+ TaskStatus.State state) throws Exception {
this.conf = conf;
this.taskId = taskId;
this.umbilical = umbilical;
@@ -153,28 +153,29 @@ public final class BSPPeerImpl<K1, V1, K
this.fs = FileSystem.get(conf);
- this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
- Constants.DEFAULT_CHECKPOINT_INTERVAL);
- this.lastCheckPointStep = 0;
-
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
peerAddress = new InetSocketAddress(bindAddress, bindPort);
- initialize();
- syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
- peerAddress.getPort());
- // initial barrier syncing to get all the hosts to the same point, to get
- // consistent peernames.
- syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
- syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
- setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
- TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
- TaskStatus.Phase.STARTING, counters));
- messenger = MessageManagerFactory.getMessageManager(conf);
- messenger.init(taskId, this, conf, peerAddress);
+ initializeIO();
+ initializeSyncService(superstep, state);
+
+ TaskStatus.Phase phase = TaskStatus.Phase.STARTING;
+ String stateString = "running";
+ if (state == TaskStatus.State.RECOVERING) {
+ phase = TaskStatus.Phase.RECOVERING;
+ stateString = "recovering";
+ }
+
+ setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
+ stateString, peerAddress.getHostName(), phase, counters));
+
+ initilizeMessaging();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized Messaging service.");
+ }
final String combinerName = conf.get("bsp.combiner.class");
if (combinerName != null) {
@@ -182,12 +183,57 @@ public final class BSPPeerImpl<K1, V1, K
conf.getClassByName(combinerName), conf);
}
+ if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fault tolerance enabled.");
+ }
+ if (superstep > 0)
+ conf.setInt("attempt.superstep", (int) superstep);
+ Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
+ if (ftClass != null) {
+ if (superstep > 0) {
+ counters.incrCounter(PeerCounter.SUPERSTEP_SUM, superstep);
+ }
+
+ this.faultToleranceService = ((BSPFaultTolerantService<M>) ReflectionUtils
+ .newInstance(ftClass, null)).constructPeerFaultTolerance(job, this,
+ syncClient, peerAddress, this.taskId, superstep, conf, messenger);
+ TaskStatus.State newState = this.faultToleranceService
+ .onPeerInitialized(state);
+
+ if (state == TaskStatus.State.RECOVERING) {
+ if (newState == TaskStatus.State.RUNNING) {
+ phase = TaskStatus.Phase.STARTING;
+ stateString = "running";
+ state = newState;
+ }
+
+ setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
+ state, stateString, peerAddress.getHostName(), phase, counters));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("State after FT service initialization - "
+ + newState.toString());
+ }
+
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized fault tolerance service");
+ }
+ }
+ }
+ doFirstSync(superstep);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info(new StringBuffer("BSP Peer successfully initialized for ")
+ .append(this.taskId.toString()).append(" ").append(superstep)
+ .toString());
+ }
}
@SuppressWarnings("unchecked")
public final void initialize() throws Exception {
- syncClient = SyncServiceFactory.getPeerSyncClient(conf);
- syncClient.init(conf, taskId.getJobID(), taskId);
initInput();
@@ -239,60 +285,59 @@ public final class BSPPeerImpl<K1, V1, K
}
}
- @Override
- public final M getCurrentMessage() throws IOException {
- return messenger.getCurrentMessage();
+ public final void initilizeMessaging() throws ClassNotFoundException {
+ messenger = MessageManagerFactory.getMessageManager(conf);
+ messenger.init(taskId, this, conf, peerAddress);
}
- @Override
- public final void send(String peerName, M msg) throws IOException {
- incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
- messenger.send(peerName, msg);
+ public final void initializeSyncService(long superstep, TaskStatus.State state)
+ throws Exception {
+
+ syncClient = SyncServiceFactory.getPeerSyncClient(conf);
+ syncClient.init(conf, taskId.getJobID(), taskId);
+ syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
+ peerAddress.getPort());
}
- /*
- * returns true if the peer would checkpoint in the next sync.
- */
- public final boolean isReadyToCheckpoint() {
+ private void doFirstSync(long superstep) throws SyncException {
+ if (superstep > 0)
+ --superstep;
+ syncClient.enterBarrier(taskId.getJobID(), taskId, superstep);
+ syncClient.leaveBarrier(taskId.getJobID(), taskId, superstep);
+ }
- checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
- if (LOG.isDebugEnabled())
- LOG.debug(new StringBuffer(1000).append("Enabled = ")
- .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
- .append(" checkPointInterval = ").append(checkPointInterval)
- .append(" lastCheckPointStep = ").append(lastCheckPointStep)
- .append(" getSuperstepCount() = ").append(getSuperstepCount())
- .toString());
+ @SuppressWarnings("unchecked")
+ public final void initializeIO() throws Exception {
+
+ initInput();
+
+ String outdir = null;
+ if (conf.get("bsp.output.dir") != null) {
+ Path outputDir = new Path(conf.get("bsp.output.dir",
+ "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+ outdir = outputDir.makeQualified(fs).toString();
+ }
+ outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
+ final RecordWriter<K2, V2> finalOut = outWriter;
- return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
- && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
+ collector = new OutputCollector<K2, V2>() {
+ @Override
+ public void collect(K2 key, V2 value) throws IOException {
+ finalOut.write(key, value);
+ }
+ };
}
- private final String checkpointedPath() {
- String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
- String ckptPath = backup + bspJob.getJobID().toString() + "/"
- + getSuperstepCount() + "/" + this.taskId.toString();
- if (LOG.isDebugEnabled())
- LOG.debug("Messages are to be saved to " + ckptPath);
- return ckptPath;
+ @Override
+ public final M getCurrentMessage() throws IOException {
+ return messenger.getCurrentMessage();
}
- final void checkpoint(String checkpointedPath, BSPMessageBundle<M> bundle) {
- FSDataOutputStream out = null;
- try {
- out = this.fs.create(new Path(checkpointedPath));
- bundle.write(out);
- } catch (IOException ioe) {
- LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
- } finally {
- try {
- if (null != out)
- out.close();
- } catch (IOException e) {
- LOG.warn("Fail to close dfs output stream while checkpointing.", e);
- }
- }
+ @Override
+ public final void send(String peerName, M msg) throws IOException {
+ incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+ messenger.send(peerName, msg);
}
/*
@@ -302,34 +347,44 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void sync() throws IOException, SyncException,
InterruptedException {
- long startBarrier = System.currentTimeMillis();
- enterBarrier();
+
// normally all messages should been send now, finalizing the send phase
messenger.finishSendPhase();
Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
.getMessageIterator();
- boolean shouldCheckPoint;
-
- if ((shouldCheckPoint = isReadyToCheckpoint())) {
- lastCheckPointStep = getSuperstepCount();
- }
-
while (it.hasNext()) {
Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
final InetSocketAddress addr = entry.getKey();
final Iterable<M> messages = entry.getValue();
final BSPMessageBundle<M> bundle = combineMessages(messages);
+ // remove this message during runtime to save a bit of memory
+ it.remove();
+ try {
+ messenger.transfer(addr, bundle);
+ } catch (Exception e) {
+ LOG.error("Error while sending messages", e);
+ }
+ }
- if (shouldCheckPoint) {
- checkpoint(checkpointedPath(), bundle);
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.beforeBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
}
+ }
- // remove this message during runtime to save a bit of memory
- it.remove();
+ long startBarrier = System.currentTimeMillis();
+ enterBarrier();
- messenger.transfer(addr, bundle);
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.duringBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
leaveBarrier();
@@ -340,9 +395,38 @@ public final class BSPPeerImpl<K1, V1, K
currentTaskStatus.setCounters(counters);
+ if (this.faultToleranceService != null) {
+ try {
+ this.faultToleranceService.afterBarrier();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
umbilical.statusUpdate(taskId, currentTaskStatus);
// Clear outgoing queues.
messenger.clearOutgoingQueues();
+
+ // int msgsCount = -1;
+ // if (shouldCheckPoint) {
+ // msgsCount = checkpointReceivedMessages(checkpointedReceivePath());
+ // }
+ //
+ // this.syncClient.storeInformation(this.syncClient.constructKey(
+ // this.bspJob.getJobID(), "checkpoint", String.valueOf(getPeerIndex())),
+ // new IntWritable(msgsCount), false, null);
+
+ // if (msgsCount >= 0) {
+ // ArrayWritable writableArray = new ArrayWritable(IntWritable.class);
+ // Writable[] writeArr = new Writable[2];
+ // writeArr[0] = new IntWritable((int) getSuperstepCount());
+ // writeArr[1] = new IntWritable(msgsCount);
+ // writableArray.set(writeArr);
+ // this.syncClient.storeInformation(
+ // this.syncClient.constructKey(this.bspJob.getJobID(), "checkpoint",
+ // String.valueOf(getPeerIndex())), writableArray, true, null);
+ // }
+
}
private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -424,8 +508,7 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public int getPeerIndex() {
- initPeerNames();
- return Arrays.binarySearch(getAllPeerNames(), getPeerName());
+ return this.taskId.getTaskID().getId();
}
@Override
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sun Aug 5 11:07:48 2012
@@ -69,7 +69,7 @@ public final class BSPTask extends Task
boolean shouldKillSelf = false;
try {
if (LOG.isDebugEnabled())
- LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+ LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
// if the RPC call returns false, it means that groomserver does not
// have knowledge of this task.
shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sun Aug 5 11:07:48 2012
@@ -163,19 +163,20 @@ public class GroomServer implements Runn
}
if (actions != null) {
- assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
+ // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
int prevPort = Constants.DEFAULT_PEER_PORT;
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
Task t = ((LaunchTaskAction) action).getTask();
- prevPort = BSPNetUtils.getNextAvailable(prevPort);
- assignedPeerNames.put(t.getTaskID(), prevPort);
-
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
LOG.info("Launch " + actions.length + " tasks.");
startNewTask((LaunchTaskAction) action);
- } else {
+ } else if (action instanceof KillTaskAction) {
// TODO Use the cleanup thread
// tasksToCleanup.put(action);
@@ -187,11 +188,30 @@ public class GroomServer implements Runn
tip.taskStatus.setRunState(TaskStatus.State.FAILED);
try {
tip.killAndCleanup(false);
+ tasks.remove(killAction.getTaskID());
+ runningTasks.remove(killAction.getTaskID());
} catch (IOException ioe) {
throw new DirectiveException("Error when killing a "
+ "TaskInProgress.", ioe);
}
}
+ } else if (action instanceof RecoverTaskAction) {
+ LOG.info("Recovery action task.");
+ RecoverTaskAction recoverAction = (RecoverTaskAction) action;
+ Task t = recoverAction.getTask();
+ LOG.info("Recovery action task." + t.getTaskID());
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
+ try {
+ startRecoveryTask(recoverAction);
+ } catch (IOException e) {
+ throw new DirectiveException(
+ new StringBuffer().append("Error starting the recovery task")
+ .append(t.getTaskID()).toString(),
+ e);
+ }
}
}
}
@@ -321,6 +341,8 @@ public class GroomServer implements Runn
this.conf.set(Constants.PEER_HOST, localHostname);
this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+ this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+ 2 * this.maxCurrentTasks);
int rpcPort = -1;
String rpcAddr = null;
@@ -571,6 +593,61 @@ public class GroomServer implements Runn
}
}
+ private void startRecoveryTask(RecoverTaskAction action) throws IOException {
+ Task t = action.getTask();
+ BSPJob jobConf = null;
+ try {
+ jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+ } catch (IOException e1) {
+ LOG.error(e1);
+ throw e1;
+ }
+
+ TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+ tip.markAsRecoveryTask(action.getSuperstepCount());
+ synchronized (this) {
+ if (tasks.containsKey(t.getTaskID())) {
+ TaskInProgress oldTip = tasks.get(t.getTaskID());
+ try {
+ oldTip.killRunner();
+ } catch (IOException e) {
+ LOG.error("Error killing the current process for " + t.getTaskID(), e);
+ throw e;
+ }
+ }
+
+ Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
+ while(taskIterator.hasNext()){
+ TaskAttemptID taskAttId = taskIterator.next();
+ if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
+ }
+ taskIterator.remove();
+ runningTasks.remove(taskAttId);
+ }
+ }
+
+ tasks.put(t.getTaskID(), tip);
+ runningTasks.put(t.getTaskID(), tip);
+ }
+ try {
+ localizeJob(tip);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+ .stringifyException(e));
+ LOG.warn(msg);
+
+ try {
+ tip.killAndCleanup(true);
+ } catch (IOException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+ + StringUtils.stringifyException(ie2));
+ }
+ throw new IOException("Errro localizing the job.",e);
+ }
+ }
+
/**
* Update and report refresh status back to BSPMaster.
*/
@@ -730,13 +807,20 @@ public class GroomServer implements Runn
+ " monitorPeriod = "
+ monitorPeriod
+ " check = "
- + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+ + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) &&
+ (((tip.lastPingedTimestamp == 0 &&
+ ((currentTime - tip.startTime) > 10 * monitorPeriod)) ||
+ ((tip.lastPingedTimestamp > 0) &&
+ (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
- && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+ && (((tip.lastPingedTimestamp == 0
+ && ((currentTime - tip.startTime) > 10 * monitorPeriod))
+ || ((tip.lastPingedTimestamp > 0)
+ && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
LOG.info("adding purge task: " + tip.getTask().getTaskID());
@@ -891,6 +975,7 @@ public class GroomServer implements Runn
private long startTime = 0L;
private volatile long lastPingedTimestamp = 0L;
+ private long startSuperstepCount = -1;
public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
this.task = task;
@@ -901,6 +986,15 @@ public class GroomServer implements Runn
TaskStatus.Phase.STARTING, task.getCounters());
}
+ public void markAsRecoveryTask(long superstepNumber) {
+ if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
+ this.taskStatus.setRunState(TaskStatus.State.RECOVERING);
+ this.taskStatus.setPhase(TaskStatus.Phase.RECOVERING);
+ this.taskStatus.setStateString("recovering");
+ }
+ this.startSuperstepCount = superstepNumber;
+ }
+
private void localizeTask(Task task) throws IOException {
Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/job.xml");
@@ -954,8 +1048,22 @@ public class GroomServer implements Runn
// runner could be null if task-cleanup attempt is not localized yet
if (runner != null) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Killing process for " + this.task.getTaskID());
+ }
+ runner.killBsp();
+ }
+ runner = null;
+ }
+
+ public synchronized void killRunner() throws IOException {
+ if (runner != null) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Killing process for " + this.task.getTaskID());
+ }
runner.killBsp();
}
+ runner = null;
}
/**
@@ -1143,6 +1251,11 @@ public class GroomServer implements Runn
defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
}
defaultConf.setInt(Constants.PEER_PORT, peerPort);
+
+ long superstep = Long.parseLong(args[4]);
+ TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
+ LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+
try {
// use job-specified working directory
@@ -1153,7 +1266,7 @@ public class GroomServer implements Runn
@SuppressWarnings("rawtypes")
final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
defaultConf, taskid, umbilical, task.partition, task.splitClass,
- task.split, task.getCounters());
+ task.split, task.getCounters(), superstep, state);
task.run(job, bspPeer, umbilical); // run the task
@@ -1195,6 +1308,24 @@ public class GroomServer implements Runn
}
}
+ public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ return tip.getStatus();
+ } else {
+ return null;
+ }
+ }
+
+ public long getStartSuperstep(TaskAttemptID taskid) {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ return tip.startSuperstepCount;
+ } else {
+ return -1L;
+ }
+ }
+
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
TaskInProgress tip = runningTasks.get(taskid);
@@ -1220,8 +1351,6 @@ public class GroomServer implements Runn
@Override
public void fsError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
- // TODO
-
}
@Override
@@ -1254,8 +1383,6 @@ public class GroomServer implements Runn
@Override
public void process(WatchedEvent event) {
- // TODO Auto-generated method stub
-
}
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Sun Aug 5 11:07:48 2012
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.WritableUtil
* A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
* {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
*/
-abstract class GroomServerAction implements Writable {
+public abstract class GroomServerAction implements Writable {
/**
* Ennumeration of various 'actions' that the {@link BSPMaster} directs the
@@ -49,7 +49,13 @@ abstract class GroomServerAction impleme
REINIT_GROOM,
/** Ask a task to save its output. */
- COMMIT_TASK
+ COMMIT_TASK,
+
+ /** Recover a task from failure. */
+ RECOVER_TASK,
+
+ /** Update information on a peer. */
+ UPDATE_PEER
};
/**
@@ -73,7 +79,17 @@ abstract class GroomServerAction impleme
case KILL_JOB: {
action = new KillJobAction();
}
- break;
+ break;
+ case RECOVER_TASK:
+ {
+ action = new RecoverTaskAction();
+ }
+ break;
+ case UPDATE_PEER:
+ {
+ action = new UpdatePeerAction();
+ }
+ break;
case REINIT_GROOM: {
action = new ReinitGroomAction();
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Sun Aug 5 11:07:48 2012
@@ -21,9 +21,11 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,14 +34,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.Constants;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantMasterService;
import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+import org.apache.hama.util.ReflectionUtils;
/**
* JobInProgress maintains all the info for keeping a Job on the straight and
* narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
* tables for doing bookkeeping of its Tasks.ss
*/
-class JobInProgress {
+public class JobInProgress {
/**
* Used when the a kill is issued to a job which is initializing.
@@ -74,6 +83,8 @@ class JobInProgress {
long launchTime;
long finishTime;
+ int maxTaskAttempts;
+
private String jobName;
// private LocalFileSystem localFs;
@@ -89,11 +100,38 @@ class JobInProgress {
String jobSplit;
Map<Task, GroomServerStatus> taskToGroomMap;
+
// Used only for scheduling!
- Map<GroomServerStatus, Integer> tasksInGroomMap;
+ Map<GroomServerStatus, Integer> taskCountInGroomMap;
+
+ // If the task does not exist as key, it implies that the task did not fail
+ // before.
+ // Value in the map implies the attempt ID for which the key(task) was
+ // re-attempted before.
+ Map<Task, Integer> taskReattemptMap;
+
+ Set<TaskInProgress> recoveryTasks;
+
+ // This set keeps track of the tasks that have failed.
+ Set<Task> failedTasksTillNow;
private int taskCompletionEventTracker = 0;
+ private TaskAllocationStrategy taskAllocationStrategy;
+
+ private FaultTolerantMasterService faultToleranceService;
+
+ /**
+ * Used only for unit tests.
+ * @param jobId
+ * @param conf
+ */
+ public JobInProgress(BSPJobID jobId, Configuration conf){
+ this.conf = conf;
+ this.jobId = jobId;
+ master = null;
+ }
+
public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
Configuration conf) throws IOException {
this.conf = conf;
@@ -102,10 +140,6 @@ class JobInProgress {
this.jobFile = jobFile;
this.master = master;
- this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
-
- this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
-
this.status = new JobStatus(jobId, null, 0L, 0L,
JobStatus.State.PREP.value(), counters);
this.startTime = System.currentTimeMillis();
@@ -127,6 +161,9 @@ class JobInProgress {
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numBSPTasks + 10);
+ this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+ Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
job.getJobName());
@@ -140,6 +177,8 @@ class JobInProgress {
fs.copyToLocalFile(new Path(jarFile), localJarFile);
}
+ failedTasksTillNow = new HashSet<Task>(2 * tasks.length);
+
}
public JobProfile getProfile() {
@@ -229,15 +268,20 @@ class JobInProgress {
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
- splits[i], this.master, this.conf, this, i);
+ splits[i], this.conf, this, i);
}
} else {
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < numBSPTasks; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
- null, this.master, this.conf, this, i);
+ null, this.conf, this, i);
}
}
+ this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
+
+ this.taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>();
+
+ this.recoveryTasks = new HashSet<TaskInProgress>(2 * tasks.length);
// Update job status
this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
@@ -248,6 +292,31 @@ class JobInProgress {
syncClient.registerJob(this.getJobID().toString());
tasksInited = true;
+
+ Class<?> taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS,
+ BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class);
+ this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils
+ .newInstance(taskAllocatorClass, new Object[0]);
+
+ if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+
+ Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class ,
+ BSPFaultTolerantService.class);
+ if (ftClass != null) {
+ try {
+ faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
+ .newInstance(ftClass, new Object[0]))
+ .constructMasterFaultTolerance(jobId, maxTaskAttempts, tasks,
+ conf, master.getSyncClient(), taskAllocationStrategy);
+ LOG.info("Initialized fault tolerance service with "
+ + ftClass.getCanonicalName());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
LOG.info("Job is initialized.");
}
@@ -269,29 +338,54 @@ class JobInProgress {
}
Task result = null;
+ BSPResource[] resources = new BSPResource[0];
- try {
- for (int i = 0; i < tasks.length; i++) {
- if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
- result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap);
- if (result != null)
- this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus());
- int taskInGroom = 0;
- if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) {
- taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus());
- }
- tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1);
- break;
+ for (int i = 0; i < tasks.length; i++) {
+ if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+
+ String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+ groomStatuses, taskCountInGroomMap, resources, tasks[i]);
+ GroomServerStatus groomStatus = taskAllocationStrategy
+ .getGroomToAllocate(groomStatuses, selectedGrooms,
+ taskCountInGroomMap, resources, tasks[i]);
+ if (groomStatus != null)
+ result = tasks[i].constructTask(groomStatus);
+ if (result != null) {
+ updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
}
+ break;
}
-
- } catch (IOException e) {
- LOG.error("Exception while obtaining new task!", e);
}
+
counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
return result;
}
+ public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+
+ if (this.faultToleranceService == null)
+ return;
+
+ try {
+ this.faultToleranceService.recoverTasks(this, groomStatuses,
+ fetchAndClearTasksToRecover(), tasks, taskCountInGroomMap, actionMap);
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
+ private void updateGroomTaskDetails(GroomServerStatus groomStatus, Task task) {
+ taskToGroomMap.put(task, groomStatus);
+ int tasksInGroom = 0;
+
+ if (taskCountInGroomMap.containsKey(groomStatus)) {
+ tasksInGroom = taskCountInGroomMap.get(groomStatus);
+ }
+ taskCountInGroomMap.put(groomStatus, tasksInGroom + 1);
+ }
+
/**
* Hosts that tasks run on.
*
@@ -305,6 +399,14 @@ class JobInProgress {
return list;
}
+ /**
+ * Mark the completed task status. If all the tasks are completed the status
+ * of the job is updated to notify the client on the completion of the whole
+ * job.
+ *
+ * @param tip <code>TaskInProgress</code> object representing task.
+ * @param status The completed task status
+ */
public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
TaskAttemptID taskid = status.getTaskId();
updateTaskStatus(tip, status);
@@ -339,6 +441,12 @@ class JobInProgress {
}
}
+ /**
+ * Mark failure of a task.
+ *
+ * @param tip <code>TaskInProgress</code> object representing task.
+ * @param status The failed task status
+ */
public void failedTask(TaskInProgress tip, TaskStatus status) {
TaskAttemptID taskid = status.getTaskId();
updateTaskStatus(tip, status);
@@ -354,8 +462,6 @@ class JobInProgress {
}
}
- // TODO
-
if (!allDone) {
// Kill job
this.kill();
@@ -372,6 +478,12 @@ class JobInProgress {
}
}
+ /**
+ * Updates the task status of the task.
+ *
+ * @param tip <code>TaskInProgress</code> representing task
+ * @param taskStatus The status of the task.
+ */
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus taskStatus) {
TaskAttemptID taskid = taskStatus.getTaskId();
@@ -415,6 +527,9 @@ class JobInProgress {
}
}
+ /**
+ * Kill the job.
+ */
public synchronized void kill() {
if (status.getRunState() != JobStatus.KILLED) {
this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
@@ -439,6 +554,12 @@ class JobInProgress {
*/
synchronized void garbageCollect() {
try {
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Removing " + localJobFile + " and " + localJarFile
+ + " getJobFile = " + profile.getJobFile());
+ }
+
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
localFs.delete(localJobFile, true);
@@ -502,4 +623,62 @@ class JobInProgress {
return events;
}
+ /**
+ * Returns the configured maximum number of times the task could be
+ * re-attempted.
+ */
+ int getMaximumReAttempts() {
+ return maxTaskAttempts;
+ }
+
+ /**
+ * Returns true if the task should be restarted on failure. It also causes
+ * JobInProgress object to maintain state of the restart request.
+ */
+ synchronized boolean handleFailure(TaskInProgress tip) {
+ if (this.faultToleranceService == null
+ || (!faultToleranceService.isRecoveryPossible(tip)))
+ return false;
+
+ if (!faultToleranceService.isAlreadyRecovered(tip)) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
+ }
+ recoveryTasks.add(tip);
+ status.setRunState(JobStatus.RECOVERING);
+ return true;
+ }
+ else if(LOG.isDebugEnabled()){
+ LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
+ }
+ return false;
+
+ }
+
+
+ /**
+ *
+ * @return Returns the list of tasks in progress that has to be recovered.
+ */
+ synchronized TaskInProgress[] fetchAndClearTasksToRecover() {
+ TaskInProgress[] failedTasksInProgress = new TaskInProgress[recoveryTasks
+ .size()];
+ recoveryTasks.toArray(failedTasksInProgress);
+
+ recoveryTasks.clear();
+ return failedTasksInProgress;
+ }
+
+ public boolean isRecoveryPending() {
+ return recoveryTasks.size() != 0;
+ }
+
+ public Set<Task> getTaskSet() {
+ return taskToGroomMap.keySet();
+ }
+
+ public FaultTolerantMasterService getFaultToleranceService() {
+ return this.faultToleranceService;
+ }
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Sun Aug 5 11:07:48 2012
@@ -40,5 +40,13 @@ abstract class JobInProgressListener {
* @throws IOException
*/
public abstract void jobRemoved(JobInProgress job) throws IOException;
+
+ /**
+ * Invoked when a task in job has to be recovered by {@link BSPMaster}.
+ * @param job The job to which the task belongs to.
+ * @param task that has to be recovered
+ * @throws IOException
+ */
+ public abstract void recoverTaskInJob(JobInProgress job) throws IOException;
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java Sun Aug 5 11:07:48 2012
@@ -44,7 +44,7 @@ public class JobStatus implements Writab
}
public static enum State {
- RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+ RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5), RECOVERING(6);
int s;
State(int s) {
@@ -74,6 +74,9 @@ public class JobStatus implements Writab
case KILLED:
name = "KILLED";
break;
+ case RECOVERING:
+ name = "RECOVERING";
+ break;
}
return name;
@@ -86,6 +89,7 @@ public class JobStatus implements Writab
public static final int FAILED = 3;
public static final int PREP = 4;
public static final int KILLED = 5;
+ public static final int RECOVERING = 6;
private BSPJobID jobid;
private long progress;
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java Sun Aug 5 11:07:48 2012
@@ -23,7 +23,7 @@ import java.io.IOException;
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
- * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ * {@link org.apache.hama.bsp.GroomServer} to launch a recovery task.
*/
class LaunchTaskAction extends GroomServerAction {
private Task task;
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sun Aug 5 11:07:48 2012
@@ -46,6 +46,7 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.BSPJobClient.RawSplit;
import org.apache.hama.bsp.BSPMaster.State;
import org.apache.hama.bsp.message.MemoryQueue;
+import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.message.MessageQueue;
@@ -407,9 +408,30 @@ public class LocalBSPRunner implements J
@Override
public void finishSendPhase() throws IOException {
- // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+ for (Writable value : bundle.getMessages()) {
+ loopBackMessage(value);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessage(Writable message) {
+ localIncomingMessages.add((M)message);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ 1L);
+ }
+ @Override
+ public void registerListener(MessageEventListener<M> listener)
+ throws IOException {
}
+
+
}
@@ -528,39 +550,33 @@ public class LocalBSPRunner implements J
}
@Override
- public void close() throws InterruptedException {
+ public void close() throws IOException {
barrier = null;
}
@Override
- public Writable getInformation(String key,
- Class<? extends Writable> classType) {
- // TODO Auto-generated method stub
- return null;
+ public boolean getInformation(String key, Writable writableVal) {
+ return false;
}
@Override
public String constructKey(BSPJobID jobId, String... args) {
- // TODO Auto-generated method stub
return null;
}
@Override
public boolean storeInformation(String key, Writable value,
boolean permanent, SyncEventListener listener) {
- // TODO Auto-generated method stub
return false;
}
@Override
public boolean addKey(String key, boolean permanent,
SyncEventListener listener) {
- // TODO Auto-generated method stub
return false;
}
@Override
public boolean hasKey(String key) {
- // TODO Auto-generated method stub
return false;
}
@@ -568,16 +584,19 @@ public class LocalBSPRunner implements J
public boolean registerListener(String key,
SyncEvent event,
SyncEventListener listener) {
- // TODO Auto-generated method stub
return false;
}
@Override
public String[] getChildKeySet(String key, SyncEventListener listener) {
- // TODO Auto-generated method stub
return null;
}
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ return false;
+ }
+
}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+public class RecoverTaskAction extends GroomServerAction {
+ private Task task;
+ private LongWritable superstepNumber;
+
+ public RecoverTaskAction() {
+ super(ActionType.RECOVER_TASK);
+ superstepNumber = new LongWritable(-1L);
+ }
+
+ public RecoverTaskAction(Task task, long superstep) {
+ super(ActionType.RECOVER_TASK);
+ this.task = task;
+ this.superstepNumber = new LongWritable(superstep);
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public long getSuperstepCount(){
+ return superstepNumber.get();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ task.write(out);
+ superstepNumber.write(out);
+
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ task = new BSPTask();
+ task.readFields(in);
+ superstepNumber.readFields(in);
+ }
+
+}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Sun Aug 5 11:07:48 2012
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.monitor.Federator;
import org.apache.hama.monitor.Federator.Act;
@@ -121,6 +122,12 @@ class SimpleTaskScheduler extends TaskSc
public void jobRemoved(JobInProgress job) throws IOException {
queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
}
+
+ @Override
+ public void recoverTaskInJob(JobInProgress job) throws IOException {
+ queueManager.get().addJob(WAIT_QUEUE, job);
+ }
+
}
private class JobProcessor extends Thread implements Schedulable {
@@ -221,11 +228,10 @@ class SimpleTaskScheduler extends TaskSc
throw new NullPointerException("No job is specified.");
}
- @Override
- public Boolean call() {
+ private Boolean scheduleNewTasks() {
// Action to be sent for each task to the respective groom server.
- Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = new HashMap<GroomServerStatus, List<LaunchTaskAction>>(
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
2 * this.groomStatuses.size());
Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
Task t = null;
@@ -240,6 +246,7 @@ class SimpleTaskScheduler extends TaskSc
// if all tasks could not be scheduled
if (cnt != this.jip.tasks.length) {
+ LOG.error("Could not schedule all tasks!");
return Boolean.FALSE;
}
@@ -248,21 +255,49 @@ class SimpleTaskScheduler extends TaskSc
while (taskIter.hasNext()) {
Task task = taskIter.next();
GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
- List<LaunchTaskAction> taskActions = actionMap.get(groomStatus);
+ List<GroomServerAction> taskActions = actionMap.get(groomStatus);
if (taskActions == null) {
- taskActions = new ArrayList<LaunchTaskAction>(
+ taskActions = new ArrayList<GroomServerAction>(
groomStatus.getMaxTasks());
}
taskActions.add(new LaunchTaskAction(task));
actionMap.put(groomStatus, taskActions);
}
+ sendDirectivesToGrooms(actionMap);
+
+ return Boolean.TRUE;
+ }
+
+ /**
+ * Schedule recovery tasks.
+ *
+ * @return TRUE object if scheduling is successful else returns FALSE
+ */
+ private Boolean scheduleRecoveryTasks() {
+
+ // Action to be sent for each task to the respective groom server.
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+ 2 * this.groomStatuses.size());
+
+ try {
+ jip.recoverTasks(groomStatuses, actionMap);
+ } catch (IOException e) {
+ return Boolean.FALSE;
+ }
+ return sendDirectivesToGrooms(actionMap);
+
+ }
+
+ private Boolean sendDirectivesToGrooms(
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
- while (jip.getStatus().getRunState() == JobStatus.RUNNING
+ while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
+ .getStatus().getRunState() == JobStatus.RECOVERING)
&& groomIter.hasNext()) {
GroomServerStatus groomStatus = groomIter.next();
- List<LaunchTaskAction> actionList = actionMap.get(groomStatus);
+ List<GroomServerAction> actionList = actionMap.get(groomStatus);
GroomProtocol worker = groomServerManager.get().findGroomServer(
groomStatus);
@@ -276,18 +311,29 @@ class SimpleTaskScheduler extends TaskSc
LOG.error(
"Fail to dispatch tasks to GroomServer "
+ groomStatus.getGroomName(), ioe);
+ return Boolean.FALSE;
}
}
if (groomIter.hasNext()
- && jip.getStatus().getRunState() != JobStatus.RUNNING) {
+ && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
+ .getStatus().getRunState() != JobStatus.RECOVERING)) {
LOG.warn("Currently master only shcedules job in running state. "
+ "This may be refined in the future. JobId:" + jip.getJobID());
+ return Boolean.FALSE;
}
return Boolean.TRUE;
}
+
+ public Boolean call() {
+ if (jip.isRecoveryPending()) {
+ return scheduleRecoveryTasks();
+ } else {
+ return scheduleNewTasks();
+ }
+ }
}
/**
@@ -365,8 +411,10 @@ class SimpleTaskScheduler extends TaskSc
this.jobProcessor.start();
if (null != getConf()
&& getConf().getBoolean("bsp.federator.enabled", false)) {
- this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(),
- ((BSPMaster) groomServerManager.get()).zk), 5, 5, SECONDS);
+ this.scheduler.scheduleAtFixedRate(
+ new JvmCollector(federator.get(),
+ ((ZKSyncBSPMasterClient) ((BSPMaster) groomServerManager.get())
+ .getSyncClient()).getZK()), 5, 5, SECONDS);
}
if (null != monitorManager.get()) {
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Sun Aug 5 11:07:48 2012
@@ -34,7 +34,7 @@ import org.apache.hama.bsp.BSPJobClient.
* TaskInProgress maintains all the info needed for a Task in the lifetime of
* its owning Job.
*/
-class TaskInProgress {
+public class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
private Configuration conf;
@@ -48,7 +48,6 @@ class TaskInProgress {
// Job Meta
private String jobFile = null;
private int partition;
- private BSPMaster bspMaster;
private TaskID id;
private JobInProgress job;
private int completes = 0;
@@ -69,6 +68,8 @@ class TaskInProgress {
// The first taskid of this tip
private TaskAttemptID firstTaskId;
+
+ private TaskAttemptID currentTaskId;
// Map from task Id -> GroomServer Id, contains tasks that are
// currently runnings
@@ -84,6 +85,8 @@ class TaskInProgress {
private RawSplit rawSplit;
+ private int mySuperstep = -1;
+
/**
* Constructor for new nexus between BSPMaster and GroomServer.
*
@@ -99,12 +102,20 @@ class TaskInProgress {
init(jobId);
}
+ /**
+ *
+ * @param jobId
+ * @param jobFile
+ * @param rawSplit
+ * @param conf
+ * @param job
+ * @param partition
+ */
public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
- BSPMaster master, Configuration conf, JobInProgress job, int partition) {
+ Configuration conf, JobInProgress job, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.rawSplit = rawSplit;
- this.setBspMaster(master);
this.job = job;
this.setConf(conf);
this.partition = partition;
@@ -112,18 +123,178 @@ class TaskInProgress {
init(jobId);
}
+ /**
+ *
+ * @param jobId
+ */
private void init(BSPJobID jobId) {
this.id = new TaskID(jobId, partition);
this.startTime = System.currentTimeMillis();
}
/**
- * Return a Task that can be sent to a GroomServer for execution.
+ *
+ * @param taskid
+ * @param grooms
+ * @param tasksInGroomMap
+ * @param possibleLocations
+ * @return
+ */
+ private String getGroomToSchedule(TaskAttemptID taskid,
+ Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+
+ for (int i = 0; i < possibleLocations.length; ++i) {
+ String location = possibleLocations[i];
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @return
+ */
+ private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+ Iterator<String> groomIter = grooms.keySet().iterator();
+ while (groomIter.hasNext()) {
+ GroomServerStatus groom = grooms.get(groomIter.next());
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param groomStatus
+ * @param grooms
+ * @return
*/
- public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
- Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+ public Task constructTask(GroomServerStatus groomStatus) {
+ if(groomStatus == null){
+ return null;
+ }
+ TaskAttemptID taskId = computeTaskId();
+ if (taskId == null) {
+ return null;
+ } else {
+ String splitClass = null;
+ BytesWritable split = null;
+ currentTaskId = taskId;
+ String groomName = groomStatus.getGroomHostName();
+ Task t = new BSPTask(jobId, jobFile, taskId, partition, splitClass, split);
+ activeTasks.put(taskId, groomName);
+ myGroomStatus = groomStatus;
+ return t;
+ }
+
+ }
+
+ // /* Remove */
+ // private Task getGroomForTask(TaskAttemptID taskid,
+ // Map<String, GroomServerStatus> grooms,
+ // Map<GroomServerStatus, Integer> tasksInGroomMap) {
+ // String splitClass = null;
+ // BytesWritable split = null;
+ // Task t = null;
+ // if (rawSplit != null) {
+ // splitClass = rawSplit.getClassName();
+ // split = rawSplit.getBytes();
+ // String[] possibleLocations = rawSplit.getLocations();
+ // String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+ // possibleLocations);
+ // if (groomName != null) {
+ // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ // activeTasks.put(taskid, groomName);
+ // myGroomStatus = grooms.get(groomName);
+ // }
+ // }
+ //
+ // if (t == null) {
+ // String groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+ // if (groomName != null) {
+ // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ // activeTasks.put(taskid, groomName);
+ // myGroomStatus = grooms.get(groomName);
+ // }
+ // }
+ //
+ // return t;
+ // }
+
+ private Task getGroomForRecoverTaskInHosts(TaskAttemptID taskid,
+ Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+ String splitClass = null;
+ BytesWritable split = null;
Task t = null;
+ String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+ possibleLocations);
+ if (groomName != null) {
+ t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ activeTasks.put(taskid, groomName);
+ myGroomStatus = grooms.get(groomName);
+ }
+
+ if (t == null) {
+ groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+ if (groomName != null) {
+ t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ activeTasks.put(taskid, groomName);
+ myGroomStatus = grooms.get(groomName);
+ }
+ }
+
+ return t;
+ }
+
+ public Task getRecoveryTask(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap, String[] hostNames)
+ throws IOException {
+ Integer count = tasksInGroomMap.get(myGroomStatus);
+ if (count != null) {
+ tasksInGroomMap.put(myGroomStatus, count - 1);
+ }
+
+ TaskAttemptID taskId = computeTaskId();
+ LOG.debug("Recovering task = " + String.valueOf(taskId));
+ if (taskId == null) {
+ return null;
+ } else {
+ return getGroomForRecoverTaskInHosts(taskId, grooms, tasksInGroomMap,
+ hostNames);
+ }
+ }
+ /**
+ *
+ * @return
+ */
+ public boolean canStartTask() {
+ return (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts));
+ }
+
+ private TaskAttemptID computeTaskId() {
TaskAttemptID taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
@@ -135,54 +306,20 @@ class TaskInProgress {
+ " attempts for the tip '" + getTIPId() + "'");
return null;
}
-
- String splitClass = null;
- BytesWritable split = null;
- GroomServerStatus selectedGroom = null;
- if (rawSplit != null) {
- splitClass = rawSplit.getClassName();
- split = rawSplit.getBytes();
- String[] possibleLocations = rawSplit.getLocations();
- for (int i = 0; i < possibleLocations.length; ++i) {
- String location = possibleLocations[i];
- GroomServerStatus groom = grooms.get(location);
- if (groom == null) {
- LOG.error("Could not find groom for location: " + location
- + " ; active grooms: " + grooms.keySet());
- continue;
- }
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()
- && location.equals(groom.getGroomHostName())) {
- selectedGroom = groom;
- t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
- activeTasks.put(taskid, groom.getGroomName());
-
- break;
- }
- }
- }
- // Failed in attempt to get data locality or there was no input split.
- if (selectedGroom == null) {
- Iterator<String> groomIter = grooms.keySet().iterator();
- while (groomIter.hasNext()) {
- GroomServerStatus groom = grooms.get(groomIter.next());
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()) {
- selectedGroom = groom;
- t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
- activeTasks.put(taskid, groom.getGroomName());
- }
- }
- }
-
- myGroomStatus = selectedGroom;
-
- return t;
+ return taskid;
}
+ // /** Remove */
+ // public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
+ // Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+ // TaskAttemptID taskId = computeTaskId();
+ // if (taskId == null) {
+ // return null;
+ // } else {
+ // return getGroomForTask(taskId, grooms, tasksInGroomMap);
+ // }
+ // }
+
// //////////////////////////////////
// Accessors
// //////////////////////////////////
@@ -344,20 +481,6 @@ class TaskInProgress {
}
/**
- * @param bspMaster the bspMaster to set
- */
- public void setBspMaster(BSPMaster bspMaster) {
- this.bspMaster = bspMaster;
- }
-
- /**
- * @return the bspMaster
- */
- public BSPMaster getBspMaster() {
- return bspMaster;
- }
-
- /**
* Set the event number that was raised for this tip
*/
public void setSuccessEventNumber(int eventNumber) {
@@ -382,4 +505,22 @@ class TaskInProgress {
return taskStatuses.get(taskid).getGroomServer();
}
+ public int getSuperstep() {
+ return mySuperstep;
+ }
+
+ public void setSuperstep(int mySuperstep) {
+ this.mySuperstep = mySuperstep;
+ }
+
+ // TODO: In future this should be extended to the list of resources that the
+ // task requires.
+ public RawSplit getFileSplit() {
+ return this.rawSplit;
+ }
+
+ public TaskAttemptID getCurrentTaskAttemptId(){
+ return this.currentTaskId;
+ }
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Sun Aug 5 11:07:48 2012
@@ -122,8 +122,9 @@ public class TaskRunner extends Thread {
int exit_code = bspProcess.waitFor();
if (!bspKilled && exit_code != 0) {
+
throw new IOException("BSP task process exit with nonzero status of "
- + exit_code + ".");
+ + exit_code + ". command = " + commands);
}
} catch (InterruptedException e) {
LOG.warn("Thread is interrupted when execeuting BSP process.", e);
@@ -223,6 +224,17 @@ public class TaskRunner extends Thread {
vargs.add(Integer.toString(addr.getPort()));
vargs.add(task.getTaskID().toString());
vargs.add(groomServer.groomHostName);
+ vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
+ TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
+
+ if(status != null &&
+ TaskStatus.State.RECOVERING.equals(status.getRunState())){
+ vargs.add(TaskStatus.State.RECOVERING.name());
+ }
+ else{
+ vargs.add(TaskStatus.State.RUNNING.name());
+ }
+
}
return vargs;
}
@@ -285,6 +297,7 @@ public class TaskRunner extends Thread {
if (bspProcess != null) {
bspProcess.destroy();
}
+
}
/**
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Sun Aug 5 11:07:48 2012
@@ -37,13 +37,14 @@ public class TaskStatus implements Writa
// enumeration for reporting current phase of a task.
public static enum Phase {
- STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+ STARTING, COMPUTE, BARRIER_SYNC, CLEANUP, RECOVERING
}
// what state is the task in?
public static enum State {
RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING,
- FAILED_UNCLEAN, KILLED_UNCLEAN
+ FAILED_UNCLEAN, KILLED_UNCLEAN, FAULT_NOTIFIED, RECOVERY_SCHEDULING,
+ RECOVERY_SCHEDULED, RECOVERING
}
private BSPJobID jobId;
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class UpdatePeerAction extends GroomServerAction {
+ TaskAttemptID taskId;
+ TaskAttemptID peerTaskId;
+ Text groomName;
+
+ public UpdatePeerAction() {
+ super(ActionType.UPDATE_PEER);
+ taskId = new TaskAttemptID();
+ groomName = new Text("");
+ }
+
+ public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId,
+ String groom) {
+ super(ActionType.UPDATE_PEER);
+ this.taskId = taskId;
+ this.peerTaskId = peerTaskId;
+ this.groomName = new Text(groom);
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ public TaskAttemptID getPeerTaskID(){
+ return peerTaskId;
+ }
+
+ public String getGroomName(){
+ return groomName.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ peerTaskId.write(out);
+ groomName.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ peerTaskId.readFields(in);
+ groomName.readFields(in);
+ }
+}