You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/03/05 00:08:08 UTC

svn commit: r1296893 - in /incubator/hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/ yarn/src/main/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Sun Mar  4 23:08:07 2012
New Revision: 1296893

URL: http://svn.apache.org/viewvc?rev=1296893&view=rev
Log:
BSPTask should periodically ping its parent

Added:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java   (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Mar  4 23:08:07 2012
@@ -12,6 +12,7 @@ Release 0.5 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via edwardyoon)
     HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
     HAMA-484: Counters should be accessible in client (tjungblut)    
     HAMA-483: Remove old and deprecated BSP API (tjungblut)    

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Sun Mar  4 23:08:07 2012
@@ -60,6 +60,11 @@
     interface.</description>
   </property>
   <property>
+    <name>bsp.groomserver.pingperiod</name>
+    <value>5000</value>
+    <description>Periodicity in milliseconds that every BSP task should send its hearbeat ping latest. If a task fails to do so, the groom server would deem the task as failed.</description>
+  </property>
+  <property>
     <name>bsp.system.dir</name>
     <value>${hadoop.tmp.dir}/bsp/system</value>
     <description>The shared directory where BSP stores control files.
@@ -104,6 +109,17 @@
     <description>The maximum number of BSP tasks that will be run simultaneously 
     by a groom server.</description>
   </property>
+   <property>
+    <name>bsp.checkpoint.enabled</name>
+    <value>false</value>
+    <description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>
+  </property>
+  <property>
+    <name>bsp.checkpoint.interval</name>
+    <value>1</value>
+    <description>If bsp.checkpoint.enabled is set to true, the checkpointing is initiated on the valueth synchronization process of BSP tasks.</description>
+  </property>
+
   
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
@@ -161,7 +177,7 @@
   </property>
   
   <property>
-    <name>hama.messanger.class</name>
+    <name>hama.messenger.class</name>
     <value>org.apache.hama.bsp.message.AvroMessageManagerImpl</value>
   </property>
   

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java Sun Mar  4 23:08:07 2012
@@ -23,8 +23,8 @@ package org.apache.hama;
  * Some constants used in the Hama.
  */
 public interface Constants {
- 
-  public  static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
+
+  public static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
 
   public static final String DEFAULT_GROOM_RPC_HOST = "0.0.0.0";
 
@@ -32,13 +32,12 @@ public interface Constants {
 
   /** Default port region rpc server listens on. */
   public static final int DEFAULT_GROOM_RPC_PORT = 50000;
-  
 
-  ///////////////////////////////////////
+  // /////////////////////////////////////
   // Constants for BSP Package
-  ///////////////////////////////////////
+  // /////////////////////////////////////
   /** default host address */
-  public  static final String PEER_HOST = "bsp.peer.hostname";
+  public static final String PEER_HOST = "bsp.peer.hostname";
   /** default host address */
   public static final String DEFAULT_PEER_HOST = "0.0.0.0";
 
@@ -47,18 +46,35 @@ public interface Constants {
   public static final int DEFAULT_PEER_PORT = 61000;
 
   public static final String PEER_ID = "bsp.peer.id";
-  
+
   /** Parameter name for what groom server implementation to use. */
-  public static final String GROOM_SERVER_IMPL= "hama.groomserver.impl";
-  
+  public static final String GROOM_SERVER_IMPL = "hama.groomserver.impl";
+
+  /** Parameter name for interval at which bsp peer should ping groomserver */
+  public static final String GROOM_PING_PERIOD = "bsp.groomserver.pingperiod";
+
+  /** Default value of ping period in milliseconds. */
+  public static final long DEFAULT_GROOM_PING_PERIOD = 5000;
+
   /** When we encode strings, we always specify UTF8 encoding */
   public static final String UTF8_ENCODING = "UTF-8";
-  
+
   public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
-  
-  ///////////////////////////////////////
+
+  // //////////////////////////////////////
+  // Checkpointing related constants
+  // //////////////////////////////////////
+
+  // Set to true to enable checkpointing.
+  public static final String CHECKPOINT_ENABLED = "bsp.checkpoint.enabled";
+  // Superstep interval at which BSPPeer should initiate a checkpoint.
+  public static final String CHECKPOINT_INTERVAL = "bsp.checkpoint.interval";
+  // By default checkpointing when enabled would checkpoint on every superstep
+  public static final short DEFAULT_CHECKPOINT_INTERVAL = 1;
+
+  // /////////////////////////////////////
   // Constants for ZooKeeper
-  ///////////////////////////////////////  
+  // /////////////////////////////////////
   /** zookeeper root */
   public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
   /** zookeeper default root */
@@ -76,7 +92,7 @@ public interface Constants {
   public static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
   /** Default ZooKeeper pause value. In milliseconds. */
   public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
-  
+
   static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
   static final String ZOOKEEPER_CLIENT_PORT = "hama.zookeeper.property.clientPort";
   static final String ZOOKEEPER_SESSION_TIMEOUT = "hama.zookeeper.session.timeout";
@@ -87,11 +103,10 @@ public interface Constants {
   /** Cluster is fully-distributed */
   static final String CLUSTER_IS_DISTRIBUTED = "true";
 
-
   // Other constants
 
   /**
    * An empty instance.
    */
-  static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
+  static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sun Mar  4 23:08:07 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
@@ -381,4 +382,12 @@ public class BSPJob extends BSPJobContex
   public void setMaxIteration(int maxIteration) {
     conf.setInt("hama.graph.max.iteration", maxIteration);
   }
+
+  public void setCheckPointInterval(int checkPointInterval) {
+    conf.setInt(Constants.CHECKPOINT_INTERVAL, checkPointInterval);
+  }
+
+  public void setCheckPointFlag(boolean enableCheckPoint) {
+    conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sun Mar  4 23:08:07 2012
@@ -70,6 +70,10 @@ public final class BSPPeerImpl<K1, V1, K
   private SyncClient syncClient;
   private MessageManager<M> messenger;
 
+  // A checkpoint is initiated at the <checkPointInterval>th interval.
+  private int checkPointInterval;
+  private long lastCheckPointStep;
+
   // IO
   private int partition;
   private String splitClass;
@@ -125,6 +129,10 @@ public final class BSPPeerImpl<K1, V1, K
 
     this.fs = FileSystem.get(conf);
 
+    this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+        Constants.DEFAULT_CHECKPOINT_INTERVAL);
+    this.lastCheckPointStep = 0;
+
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
@@ -206,6 +214,25 @@ public final class BSPPeerImpl<K1, V1, K
     messenger.send(peerName, msg);
   }
 
+  /*
+   * returns true if the peer would checkpoint in the next sync.
+   */
+  public final boolean isReadyToCheckpoint() {
+
+    checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+    if (LOG.isDebugEnabled())
+      LOG.debug(new StringBuffer(1000).append("Enabled = ")
+          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+          .append(" checkPointInterval = ").append(checkPointInterval)
+          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+          .append(" getSuperstepCount() = ").append(getSuperstepCount())
+          .toString());
+
+    return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+        && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
+
+  }
+
   private final String checkpointedPath() {
     String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
     String ckptPath = backup + bspJob.getJobID().toString() + "/"
@@ -243,6 +270,12 @@ public final class BSPPeerImpl<K1, V1, K
     Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
         .getMessageIterator();
 
+    boolean shouldCheckPoint = false;
+
+    if ((shouldCheckPoint = isReadyToCheckpoint())) {
+      lastCheckPointStep = getSuperstepCount();
+    }
+
     while (it.hasNext()) {
       Entry<InetSocketAddress, LinkedList<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
@@ -250,7 +283,7 @@ public final class BSPPeerImpl<K1, V1, K
 
       final BSPMessageBundle<M> bundle = combineMessages(messages);
 
-      if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      if (shouldCheckPoint) {
         checkpoint(checkpointedPath(), bundle);
       }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sun Mar  4 23:08:07 2012
@@ -20,6 +20,10 @@ package org.apache.hama.bsp;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Calendar;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,7 +31,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.Constants;
 import org.apache.hama.ipc.BSPPeerProtocol;
 
 /**
@@ -41,7 +45,53 @@ public final class BSPTask extends Task 
   BytesWritable split;
   String splitClass;
 
+  // Schedule Heartbeats to GroomServer
+  private ScheduledExecutorService pingService;
+
+  /**
+   * This thread is responsible for sending a heartbeat ping to GroomServer.
+   */
+  private static class PingGroomServer implements Runnable {
+
+    private BSPPeerProtocol pingRPC;
+    private TaskAttemptID taskId;
+    private Thread bspThread;
+
+    public PingGroomServer(BSPPeerProtocol umbilical, TaskAttemptID id) {
+      pingRPC = umbilical;
+      taskId = id;
+      bspThread = Thread.currentThread();
+    }
+
+    @Override
+    public void run() {
+
+      boolean shouldKillSelf = false;
+      try {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+        // if the RPC call returns false, it means that groomserver does not
+        // have knowledge of this task.
+        shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
+      } catch (IOException e) {
+        LOG.error(new StringBuilder(
+            "IOException pinging GroomServer from task - ").append(taskId), e);
+        shouldKillSelf = true;
+      } catch (Exception e) {
+        LOG.error(new StringBuilder(
+            "Exception pinging GroomServer from task - ").append(taskId), e);
+        shouldKillSelf = true;
+      }
+      if (shouldKillSelf) {
+        LOG.error("Killing self. No connection to groom.");
+        System.exit(69);
+      }
+
+    }
+  }
+
   public BSPTask() {
+    this.pingService = Executors.newScheduledThreadPool(1);
   }
 
   public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid,
@@ -53,6 +103,8 @@ public final class BSPTask extends Task 
 
     this.splitClass = splitClass;
     this.split = split;
+
+    this.pingService = Executors.newScheduledThreadPool(1);
   }
 
   @Override
@@ -60,31 +112,82 @@ public final class BSPTask extends Task 
     return new BSPTaskRunner(this, groom, this.conf);
   }
 
+  private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
+
+    long pingPeriod = job.getConf().getLong(Constants.GROOM_PING_PERIOD,
+        Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
+
+    try {
+      if (pingPeriod > 0) {
+        pingService.scheduleWithFixedDelay(new PingGroomServer(umbilical,
+            taskId), 0, pingPeriod, TimeUnit.MILLISECONDS);
+        LOG.error("Started pinging to groom");
+      }
+    } catch (Exception e) {
+      LOG.error("Error scheduling ping service", e);
+    }
+  }
+
+  private void stopPingingGroom() {
+    if (pingService != null) {
+      LOG.error("Shutting down ping service.");
+      pingService.shutdownNow();
+    }
+  }
+
   @Override
   public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
-      BSPPeerProtocol umbilical) throws IOException, SyncException,
-      ClassNotFoundException, InterruptedException {
-    runBSP(job, bspPeer, split, umbilical);
+      BSPPeerProtocol umbilical) throws Exception {
+
+    startPingingGroom(job, umbilical);
+    try {
+      runBSP(job, bspPeer, split, umbilical);
+      done(umbilical);
+    } finally {
+      stopPingingGroom();
+    }
 
-    done(umbilical);
   }
 
   @SuppressWarnings("unchecked")
   private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
-      final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
+      final BSPJob job,
+      BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
-      throws IOException, SyncException, ClassNotFoundException,
-      InterruptedException {
+      throws Exception {
 
     BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
         .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
             job.getConf());
 
-    bsp.setup(bspPeer);
-    bsp.bsp(bspPeer);
-
-    bsp.cleanup(bspPeer);
-    bspPeer.close();
+    // The policy is to throw the first exception and log the remaining.
+    Exception firstException = null;
+    try {
+      bsp.setup(bspPeer);
+      bsp.bsp(bspPeer);
+    } catch (Exception e) {
+      LOG.error("Error running bsp setup and bsp function.", e);
+      firstException = e;
+    } finally {
+      try {
+        bsp.cleanup(bspPeer);
+      } catch (Exception e) {
+        LOG.error("Error cleaning up after bsp executed.", e);
+        if (firstException == null)
+          firstException = e;
+      } finally {
+
+        try {
+          bspPeer.close();
+        } catch (Exception e) {
+          LOG.error("Error closing BSP Peer.", e);
+          if (firstException == null)
+            firstException = e;
+        }
+        if (firstException != null)
+          throw firstException;
+      }
+    }
   }
 
   public final BSPJob getConf() {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sun Mar  4 23:08:07 2012
@@ -24,8 +24,10 @@ import java.io.PrintStream;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +36,10 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,9 +53,9 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.sync.SyncException;
@@ -133,6 +138,9 @@ public class GroomServer implements Runn
   // private BlockingQueue<GroomServerAction> tasksToCleanup = new
   // LinkedBlockingQueue<GroomServerAction>();
 
+  // Schedule Heartbeats to GroomServer
+  private ScheduledExecutorService taskMonitorService;
+
   private class DispatchTasksHandler implements DirectiveHandler {
 
     public void handle(Directive directive) throws DirectiveException {
@@ -213,6 +221,49 @@ public class GroomServer implements Runn
     }
   }
 
+  /*
+   * This thread is responsible for monitoring the pings from peers. If any peer
+   * fails to ping this groom for a pre-defined period, the task is purged from
+   * the records.
+   */
+  private class BSPTasksMonitor extends Thread {
+
+    private List<TaskInProgress> outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
+        conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+
+    private BSPTasksMonitor() {
+
+      outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
+          conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+    }
+
+    public void run() {
+
+      getObliviousTasks(outOfContactTasks);
+
+      if (outOfContactTasks.size() > 0) {
+        LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
+      }
+
+      Iterator<TaskInProgress> taskIter = outOfContactTasks.iterator();
+
+      while (taskIter.hasNext()) {
+        TaskInProgress tip = taskIter.next();
+        try {
+          LOG.debug("Purging task " + tip);
+          purgeTask(tip, true);
+        } catch (Exception e) {
+          LOG.error(
+              new StringBuilder("Error while removing a timed-out task - ")
+                  .append(tip.toString()), e);
+
+        }
+      }
+      outOfContactTasks.clear();
+
+    }
+  }
+
   public GroomServer(Configuration conf) throws IOException {
     LOG.info("groom start");
     this.conf = conf;
@@ -322,7 +373,17 @@ public class GroomServer implements Runn
         new DispatchTasksHandler());
     instructor.start();
 
-    if(conf.getBoolean("bsp.monitor.enabled", true)) {
+    if (this.taskMonitorService == null) {
+      this.taskMonitorService = Executors.newScheduledThreadPool(1);
+      long monitorPeriod = this.conf.getLong(Constants.GROOM_PING_PERIOD,
+          Constants.DEFAULT_GROOM_PING_PERIOD);
+      if (monitorPeriod > 0) {
+        this.taskMonitorService.scheduleWithFixedDelay(new BSPTasksMonitor(),
+            1000, monitorPeriod, TimeUnit.MILLISECONDS);
+      }
+    }
+
+    if (conf.getBoolean("bsp.monitor.enabled", true)) {
       // TODO: conf.get("bsp.monitor.class.impl", "Monitor.class")
       // so user can switch to customized monitor impl if necessary.
       new Monitor(conf, zk, this.groomServerName).start();
@@ -608,6 +669,50 @@ public class GroomServer implements Runn
     }
   }
 
+  private synchronized void getObliviousTasks(
+      List<TaskInProgress> outOfContactTasks) {
+
+    if (runningTasks == null) {
+      LOG.debug("returning null");
+      return;
+    }
+
+    long currentTime = Calendar.getInstance().getTimeInMillis();
+    long monitorPeriod = conf.getLong(Constants.GROOM_PING_PERIOD,
+        Constants.DEFAULT_GROOM_PING_PERIOD);
+
+    for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks
+        .entrySet()) {
+      TaskInProgress tip = entry.getValue();
+      if (LOG.isDebugEnabled())
+        LOG.debug("checking task: "
+            + tip.getTask().getTaskID()
+            + " starttime ="
+            + tip.startTime
+            + " lastping = "
+            + tip.lastPingedTimestamp
+            + " run state = "
+            + tip.taskStatus.getRunState().toString()
+            + " monitorPeriod = "
+            + monitorPeriod
+            + " check = "
+            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+
+      // Task is out of contact if it has not pinged since more than
+      // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
+      // to get started.
+      if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
+          && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+
+        LOG.info("adding purge task: " + tip.getTask().getTaskID());
+
+        outOfContactTasks.add(tip);
+      }
+
+    }
+
+  }
+
   /**
    * The datastructure for initializing a job
    */
@@ -711,6 +816,11 @@ public class GroomServer implements Runn
       taskReportServer.stop();
       taskReportServer = null;
     }
+
+    if (taskMonitorService != null) {
+      taskMonitorService.shutdownNow();
+      taskMonitorService = null;
+    }
   }
 
   public static Thread startGroomServer(final GroomServer hrs) {
@@ -738,6 +848,9 @@ public class GroomServer implements Runn
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
 
+    private long startTime = 0L;
+    private volatile long lastPingedTimestamp = 0L;
+
     public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
       this.task = task;
       this.jobConf = jobConf;
@@ -782,6 +895,7 @@ public class GroomServer implements Runn
       taskStatus.setRunState(TaskStatus.State.RUNNING);
       this.runner = task.createRunner(GroomServer.this);
       this.runner.start();
+      startTime = Calendar.getInstance().getTimeInMillis();
       LOG.info("Task '" + task.getTaskID().toString() + "' has started.");
     }
 
@@ -873,6 +987,10 @@ public class GroomServer implements Runn
         }
       }
     }
+
+    public synchronized void ping(long timestamp) {
+      this.lastPingedTimestamp = timestamp;
+    }
   }
 
   public boolean isRunning() {
@@ -983,6 +1101,7 @@ public class GroomServer implements Runn
       } catch (FSError e) {
         LOG.fatal("FSError from child", e);
         umbilical.fsError(taskid, e.getMessage());
+        e.printStackTrace();
       } catch (SyncException e) {
         LOG.fatal("SyncError from child", e);
         umbilical.fatalError(taskid, e.toString());
@@ -990,11 +1109,13 @@ public class GroomServer implements Runn
         // Report back any failures, for diagnostic purposes
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         e.printStackTrace(new PrintStream(baos));
+        e.printStackTrace();
       } catch (Throwable throwable) {
-        LOG.warn("Error running child", throwable);
+        LOG.fatal("Error running child", throwable);
         // Report back any failures, for diagnostic purposes
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         throwable.printStackTrace(new PrintStream(baos));
+        throwable.printStackTrace();
       } finally {
         RPC.stopProxy(umbilical);
         // Shutting down log4j of the child-vm...
@@ -1017,7 +1138,11 @@ public class GroomServer implements Runn
 
   @Override
   public boolean ping(TaskAttemptID taskid) throws IOException {
-    // TODO Auto-generated method stub
+    TaskInProgress tip = runningTasks.get(taskid);
+    if (tip != null) {
+      tip.ping(Calendar.getInstance().getTimeInMillis());
+      return true;
+    }
     return false;
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sun Mar  4 23:08:07 2012
@@ -246,18 +246,39 @@ public class LocalBSPRunner implements J
         realBytes = splits[id].getBytes();
       }
 
-      peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
-          new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
-          splitname, realBytes, new Counters());
+      peer = new BSPPeerImpl(job, conf, new TaskAttemptID(new TaskID(
+          job.getJobID(), id), id), new LocalUmbilical(), id, splitname,
+          realBytes, new Counters());
+      // Throw the first exception and log all the other exception.
+      Exception firstException = null;
       try {
         bsp.setup(peer);
         bsp.bsp(peer);
       } catch (Exception e) {
         LOG.error("Exception during BSP execution!", e);
+        firstException = e;
+      } finally {
+        try {
+          bsp.cleanup(peer);
+        } catch (Exception e) {
+          LOG.error("Error cleaning up after bsp execution.", e);
+          if (firstException == null)
+            firstException = e;
+        } finally {
+          try {
+            peer.clear();
+            peer.close();
+          } catch (Exception e) {
+            LOG.error("Exception closing BSP peer,", e);
+            if (firstException == null)
+              firstException = e;
+          } finally {
+            if (firstException != null)
+              throw firstException;
+          }
+        }
+
       }
-      bsp.cleanup(peer);
-      peer.clear();
-      peer.close();
     }
 
     @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Sun Mar  4 23:08:07 2012
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.ipc.BSPPeerProtocol;
 
 /**
@@ -48,7 +47,7 @@ public abstract class Task implements Wr
 
   // Current counters
   private transient Counters counters = new Counters();
-  
+
   public Task() {
     jobId = new BSPJobID();
     taskId = new TaskAttemptID();
@@ -98,15 +97,17 @@ public abstract class Task implements Wr
   public int getPartition() {
     return partition;
   }
-  
-  /** Construct output file names so that, when an output directory listing is
-   * sorted lexicographically, positions correspond to output partitions.*/
+
+  /**
+   * Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.
+   */
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
   static {
     NUMBER_FORMAT.setMinimumIntegerDigits(5);
     NUMBER_FORMAT.setGroupingUsed(false);
   }
-  
+
   static synchronized String getOutputName(int partition) {
     return "part-" + NUMBER_FORMAT.format(partition);
   }
@@ -142,18 +143,21 @@ public abstract class Task implements Wr
    * @param bspPeer for communications
    * @param umbilical for communications with GroomServer
    */
-  public abstract void run(BSPJob job, BSPPeerImpl<?,?,?,?,?> bspPeer, BSPPeerProtocol umbilical)
-      throws IOException, SyncException, ClassNotFoundException, InterruptedException;
+  public abstract void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
+      BSPPeerProtocol umbilical) throws Exception;
 
   public abstract BSPTaskRunner createRunner(GroomServer groom);
 
   public void done(BSPPeerProtocol umbilical) throws IOException {
     umbilical.done(getTaskID());
   }
-  
+
   public abstract BSPJob getConf();
+
   public abstract void setConf(BSPJob localJobConf);
 
-  Counters getCounters() { return counters; }
-  
+  Counters getCounters() {
+    return counters;
+  }
+
 }

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1296893&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sun Mar  4 23:08:07 2012
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.HamaTestCase;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+
+public class TestBSPTaskFaults extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
+
+  private static final int PORT = 54321;
+  public static final String TEST_POINT = "bsp.ft.test.point";
+
+  private volatile MinimalGroomServer groom;
+  private volatile BSPPeerProtocol umbilical;
+  private Server workerServer;
+  private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID(
+      "job_201110302255", 1), 1), 1);
+
+  public volatile static HamaConfiguration conf;
+
+  private ScheduledExecutorService testBSPTaskService;
+
+  @SuppressWarnings("unused")
+  public static class MinimalGroomServer implements BSPPeerProtocol {
+
+    private volatile int pingCount;
+    private volatile long firstPingTime;
+    private volatile long lastPingTime;
+    private boolean isShutDown = false;
+    private boolean taskComplete = false;
+    private boolean errorCondition = false;
+    private Configuration conf;
+
+    public MinimalGroomServer(Configuration config) throws IOException {
+      conf = config;
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return BSPPeerProtocol.versionID;
+    }
+
+    @Override
+    public void close() throws IOException {
+      isShutDown = true;
+
+    }
+
+    @Override
+    public Task getTask(TaskAttemptID taskid) throws IOException {
+      return new BSPTask();
+    }
+
+    @Override
+    public boolean ping(TaskAttemptID taskid) throws IOException {
+      LOG.error("Pinged");
+      ++pingCount;
+      if (pingCount == 1) {
+        firstPingTime = System.currentTimeMillis();
+      }
+      lastPingTime = System.currentTimeMillis();
+
+      return true;
+    }
+
+    @Override
+    public void done(TaskAttemptID taskid) throws IOException {
+      taskComplete = true;
+
+    }
+
+    @Override
+    public void fsError(TaskAttemptID taskId, String message)
+        throws IOException {
+      errorCondition = true;
+
+    }
+
+    @Override
+    public void fatalError(TaskAttemptID taskId, String message)
+        throws IOException {
+      errorCondition = true;
+
+    }
+
+    @Override
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      return true;
+    }
+
+    @Override
+    public int getAssignedPortNum(TaskAttemptID taskid) {
+      return 0;
+    }
+
+    public synchronized int getPingCount() {
+      return pingCount;
+    }
+
+    public void setPingCount(int pingCount) {
+      this.pingCount = pingCount;
+      if (pingCount == 0) {
+        firstPingTime = 0L;
+        lastPingTime = 0L;
+      }
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private class TestBSPTaskThreadRunner extends Thread {
+
+    BSPJob job;
+
+    TestBSPTaskThreadRunner(BSPJob jobConf) {
+      job = jobConf;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void run() {
+      BSPTask task = new BSPTask();
+      task.setConf(job);
+
+      try {
+        BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, taskid,
+            umbilical, 0, null, null, new Counters());
+        task.run(job, bspPeer, umbilical); // run the task
+      } catch (Exception e) {
+        LOG.error("Error in BSPTask execution.", e);
+      }
+
+    }
+  }
+
+  /*
+   * Test BSP exiting its own process. Hence we need a minimal process runner.
+   */
+
+  public static class TestBSPProcessRunner implements Callable<Integer> {
+    private final ScheduledExecutorService sched;
+    private final AtomicReference<ScheduledFuture<Integer>> future;
+    private Process bspTaskProcess;
+    private Thread errorLog;
+    private Thread infoLog;
+
+    TestBSPProcessRunner() {
+      sched = Executors.newScheduledThreadPool(1);
+      future = new AtomicReference<ScheduledFuture<Integer>>();
+      bspTaskProcess = null;
+    }
+
+    private void readStream(InputStream input) throws IOException {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+      String line;
+      while ((line = reader.readLine()) != null) {
+        LOG.info(line);
+      }
+    }
+
+    public void startBSPProcess() {
+      this.future.set(this.sched.schedule(this, 0, SECONDS));
+      LOG.debug("Start building BSPPeer process.");
+    }
+
+    public int getBSPExitCode() {
+      try {
+        return this.future.get().get();
+      } catch (Exception e) {
+        LOG.error("Error while fetching exit status from BSPTask", e);
+      } finally {
+
+      }
+      return -1;
+    }
+
+    public void destroyProcess() {
+
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+      List<String> commands = new ArrayList<String>();
+      String workDir = new File(".").getAbsolutePath();
+      File jvm = // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+      commands.add(jvm.toString());
+
+      StringBuffer classPath = new StringBuffer();
+      // start with same classpath as parent process
+      classPath.append(System.getProperty("java.class.path"));
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(new File(workDir, "core/target/test-classes"));
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(workDir);
+
+      commands.add("-classpath");
+      commands.add(classPath.toString());
+
+      commands.add(TestBSPProcessRunner.class.getName());
+
+      LOG.info("starting process for failure case - "
+          + conf.getInt(TEST_POINT, 0));
+      commands.add("" + conf.getInt(TEST_POINT, 0));
+
+      LOG.info(commands.toString());
+
+      ProcessBuilder builder = new ProcessBuilder(commands);
+
+      try {
+        bspTaskProcess = builder.start();
+
+        // We have errorLog and infoLog to prevent block on pipe between
+        // child and parent process.
+        errorLog = new Thread() {
+          public void run() {
+            try {
+              readStream(bspTaskProcess.getErrorStream());
+            } catch (Exception e) {
+
+            }
+          }
+        };
+        errorLog.start();
+
+        infoLog = new Thread() {
+          public void run() {
+            try {
+              readStream(bspTaskProcess.getInputStream());
+            } catch (Exception e) {
+
+            }
+          }
+        };
+        infoLog.start();
+
+        int exit_code = bspTaskProcess.waitFor();
+        return exit_code;
+      } catch (Exception e) {
+        LOG.error("Error getting exit code of child process", e);
+      }
+      return -1;
+    }
+
+    public static void main(String[] args) {
+
+      HamaConfiguration hamaConf = new HamaConfiguration();
+      hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
+      hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
+      hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+          LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+      hamaConf.setInt("bsp.master.port", 610002);
+
+      TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
+          "job_201110102255", 1), 1), 1);
+
+      if (args.length > 0) {
+        hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
+
+      }
+
+      try {
+        BSPJob job = new BSPJob(hamaConf);
+        final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
+            BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+            new InetSocketAddress("127.0.0.1", 54321), hamaConf);
+
+        BSPTask task = new BSPTask();
+        task.setConf(job);
+
+        LOG.info("Testing failure case in process - "
+            + hamaConf.getInt(TEST_POINT, 0));
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+          public void run() {
+            try {
+              proto.close();
+            } catch (Exception e) {
+              // too late to log!
+            }
+          }
+        });
+
+        @SuppressWarnings("rawtypes")
+        BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, hamaConf,
+            tid, proto, 0, null, null, new Counters());
+        task.run(job, bspPeer, proto); // run the task
+
+      } catch (Exception e) {
+        LOG.error("Error in bsp child process.", e);
+      }
+
+    }
+
+  }
+
+  /*
+   * Test BSP class that has faults injected in each phase.
+   */
+  private static class FaulTestBSP extends
+      BSP<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> {
+
+    @Override
+    public void setup(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      if (peer.getConfiguration().getInt(TEST_POINT, 0) == 1) {
+        throw new RuntimeException("Error injected in setup");
+      }
+      Thread.sleep(500);
+      super.setup(peer);
+      LOG.info("Succesfully completed setup for bsp.");
+    }
+
+    @Override
+    public void cleanup(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException {
+      if (peer.getConfiguration().getInt(TEST_POINT, 0) == 3) {
+        throw new RuntimeException("Error injected in cleanup");
+      }
+      try {
+        Thread.sleep(500);
+      } catch (Exception e) {
+        LOG.error("Interrupted BSP thread.", e);
+      }
+      super.cleanup(peer);
+      LOG.info("Succesfully cleaned up after bsp.");
+    }
+
+    @Override
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      if (peer.getConfiguration().getInt(TEST_POINT, 0) == 2) {
+        throw new RuntimeException("Error injected in bsp function");
+      }
+      Thread.sleep(500);
+      LOG.info("Succesfully completed bsp.");
+    }
+
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+
+    super.setUp();
+    conf = new HamaConfiguration();
+
+    conf.setInt(Constants.GROOM_PING_PERIOD, 200);
+    conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
+    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+        LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+    InetSocketAddress inetAddress = new InetSocketAddress(PORT);
+    groom = new MinimalGroomServer(conf);
+    workerServer = RPC.getServer(groom, inetAddress.getHostName(),
+        inetAddress.getPort(), conf);
+    workerServer.start();
+
+    LOG.info("Started RPC server");
+    conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
+
+    umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class,
+        BSPPeerProtocol.versionID, inetAddress, conf);
+    LOG.info("Started the proxy connections");
+
+    this.testBSPTaskService = Executors.newScheduledThreadPool(1);
+  }
+
+  private int getExpectedPingCounts() {
+    return ((int) (2 * (groom.lastPingTime - groom.firstPingTime) / (conf
+        .getInt(Constants.GROOM_PING_PERIOD, 5000))));
+  }
+
+  private void checkIfPingTestPassed() {
+    int expectedPingCounts = getExpectedPingCounts();
+    LOG.info("Counted " + groom.pingCount + " pings and expected "
+        + expectedPingCounts + " pings.");
+    boolean testPass = groom.getPingCount() >= expectedPingCounts;
+    assertEquals(true, testPass);
+  }
+
+  /*
+   * Test if we get the expected counts of ping.
+   */
+  public void testPing() {
+    conf.setInt(TEST_POINT, 0);
+
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    Future<Integer> future = completionService
+        .submit(new TestBSPProcessRunner());
+
+    try {
+      future.get(20000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e1) {
+      LOG.error("Interrupted Exception.", e1);
+    } catch (ExecutionException e1) {
+      LOG.error("ExecutionException Exception.", e1);
+    } catch (TimeoutException e) {
+      LOG.error("TimeoutException Exception.", e);
+    }
+
+    checkIfPingTestPassed();
+    groom.setPingCount(0);
+    this.testBSPTaskService.shutdownNow();
+
+  }
+
+  /*
+   * Inject failure and different points and sense if the pings are coming or
+   * not.
+   */
+  public void testPingOnTaskSetupFailure() {
+
+    LOG.info("Testing ping failure case - 1");
+    conf.setInt(TEST_POINT, 1);
+
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    Future<Integer> future = completionService
+        .submit(new TestBSPProcessRunner());
+
+    try {
+      future.get(20000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e1) {
+      LOG.error("Interrupted Exception.", e1);
+    } catch (ExecutionException e1) {
+      LOG.error("ExecutionException Exception.", e1);
+    } catch (TimeoutException e) {
+      LOG.error("TimeoutException Exception.", e);
+    }
+
+    checkIfPingTestPassed();
+    groom.setPingCount(0);
+    this.testBSPTaskService.shutdownNow();
+
+  }
+
+  public void testPingOnTaskExecFailure() {
+
+    LOG.info("Testing ping failure case - 2");
+    conf.setInt(TEST_POINT, 2);
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    Future<Integer> future = completionService
+        .submit(new TestBSPProcessRunner());
+
+    try {
+      future.get(20000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e1) {
+      LOG.error("Interrupted Exception.", e1);
+    } catch (ExecutionException e1) {
+      LOG.error("ExecutionException Exception.", e1);
+    } catch (TimeoutException e) {
+      LOG.error("TimeoutException Exception.", e);
+    }
+
+    checkIfPingTestPassed();
+    groom.setPingCount(0);
+    this.testBSPTaskService.shutdownNow();
+
+  }
+
+  public void testPingOnTaskCleanupFailure() {
+
+    LOG.info("Testing ping failure case - 3");
+
+    conf.setInt(TEST_POINT, 3);
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    Future<Integer> future = completionService
+        .submit(new TestBSPProcessRunner());
+
+    try {
+      future.get(20000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e1) {
+      LOG.error("Interrupted Exception.", e1);
+    } catch (ExecutionException e1) {
+      LOG.error("ExecutionException Exception.", e1);
+    } catch (TimeoutException e) {
+      LOG.error("TimeoutException Exception.", e);
+    }
+
+    checkIfPingTestPassed();
+    groom.setPingCount(0);
+    this.testBSPTaskService.shutdownNow();
+
+  }
+
+  public void testBSPTaskSelfDestroy() {
+    LOG.info("Testing self kill on lost contact.");
+
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    Future<Integer> future = completionService
+        .submit(new TestBSPProcessRunner());
+
+    try {
+      while (groom.pingCount == 0) {
+        Thread.sleep(100);
+      }
+    } catch (Exception e) {
+      LOG.error("Interrupted the timer for 1 sec.", e);
+    }
+
+    workerServer.stop();
+    umbilical = null;
+    workerServer = null;
+    Integer exitValue = -1;
+    try {
+      exitValue = future.get(20000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e1) {
+      LOG.error("Interrupted Exception.", e1);
+    } catch (ExecutionException e1) {
+      LOG.error("ExecutionException Exception.", e1);
+    } catch (TimeoutException e) {
+      LOG.error("TimeoutException Exception.", e);
+    }
+
+    assertEquals(69, exitValue.intValue());
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+
+    super.tearDown();
+    if (groom != null)
+      groom.setPingCount(0);
+    if (umbilical != null) {
+      umbilical.close();
+      Thread.sleep(2000);
+    }
+    if (workerServer != null)
+      workerServer.stop();
+    testBSPTaskService.shutdownNow();
+    Thread.sleep(2000);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Mar  4 23:08:07 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp;
 
+import java.net.InetSocketAddress;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -25,8 +27,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
 import org.apache.hama.bsp.messages.ByteMessage;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
 
 public class TestCheckpoint extends TestCase {
 
@@ -34,36 +43,123 @@ public class TestCheckpoint extends Test
 
   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public void testCheckpoint() throws Exception {
     Configuration config = new HamaConfiguration();
     FileSystem dfs = FileSystem.get(config);
 
     BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
+    bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
+        new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
+        "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
     assertNotNull("BSPPeerImpl should not be null.", bspTask);
-    if(dfs.mkdirs(new Path("checkpoint"))) {
-      if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
-        if(dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")));
+    if (dfs.mkdirs(new Path("checkpoint"))) {
+      if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
+        if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
+          ;
       }
     }
-    assertTrue("Make sure directory is created.", 
-               dfs.exists(new Path(checkpointedDir)));
+    assertTrue("Make sure directory is created.",
+        dfs.exists(new Path(checkpointedDir)));
     byte[] tmpData = "data".getBytes();
     BSPMessageBundle bundle = new BSPMessageBundle();
     bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
     assertNotNull("Message bundle can not be null.", bundle);
     assertNotNull("Configuration should not be null.", config);
-    bspTask.checkpoint(checkpointedDir+"/attempt_201110302255_0001_000000_0", 
-                       bundle);
-    FSDataInputStream in = dfs.open(new Path(checkpointedDir+
-      "/attempt_201110302255_0001_000000_0"));
+    bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
+        bundle);
+    FSDataInputStream in = dfs.open(new Path(checkpointedDir
+        + "/attempt_201110302255_0001_000000_0"));
     BSPMessageBundle bundleRead = new BSPMessageBundle();
     bundleRead.readFields(in);
     in.close();
-    ByteMessage byteMsg = (ByteMessage)(bundleRead.getMessages()).get(0);
+    ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
     String content = new String(byteMsg.getData());
-    LOG.info("Saved checkpointed content is "+content);
-    assertTrue("Message content should be the same.",  "data".equals(content)); 
+    LOG.info("Saved checkpointed content is " + content);
+    assertTrue("Message content should be the same.", "data".equals(content));
     dfs.delete(new Path("checkpoint"), true);
   }
+
+  public void testCheckpointInterval() throws Exception {
+
+    HamaConfiguration conf = new HamaConfiguration();
+
+    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+        LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+
+    conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
+
+    int port = 54321;
+    InetSocketAddress inetAddress = new InetSocketAddress(port);
+    MinimalGroomServer groom = new MinimalGroomServer(conf);
+    Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
+        inetAddress.getPort(), conf);
+    workerServer.start();
+
+    LOG.info("Started RPC server");
+    conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
+
+    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+        BSPPeerProtocol.class, BSPPeerProtocol.versionID, inetAddress, conf);
+    LOG.info("Started the proxy connections");
+
+    TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
+        "job_201110102255", 1), 1), 1);
+
+    try {
+      BSPJob job = new BSPJob(conf);
+      final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
+          BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+          new InetSocketAddress("127.0.0.1", port), conf);
+
+      BSPTask task = new BSPTask();
+      task.setConf(job);
+
+      @SuppressWarnings("rawtypes")
+      BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
+          proto, 0, null, null, new Counters());
+
+      bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
+          TaskStatus.State.RUNNING, "running", "127.0.0.1",
+          TaskStatus.Phase.STARTING, new Counters()));
+
+      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+
+      conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+      conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
+
+      bspPeer.sync();
+
+      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+          + bspPeer.getSuperstepCount());
+      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+      bspPeer.sync();
+      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+          + bspPeer.getSuperstepCount());
+      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+      bspPeer.sync();
+      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+          + bspPeer.getSuperstepCount());
+      assertEquals(bspPeer.isReadyToCheckpoint(), true);
+
+      job.setCheckPointInterval(5);
+      bspPeer.sync();
+      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+          + bspPeer.getSuperstepCount());
+      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+      bspPeer.sync();
+      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
+          + bspPeer.getSuperstepCount());
+      assertEquals(bspPeer.isReadyToCheckpoint(), true);
+
+    } catch (Exception e) {
+      LOG.error("Error testing BSPPeer.", e);
+    } finally {
+      umbilical.close();
+      Thread.sleep(2000);
+      workerServer.stop();
+      Thread.sleep(2000);
+    }
+
+  }
 }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1296893&r1=1296892&r2=1296893&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Sun Mar  4 23:08:07 2012
@@ -40,7 +40,7 @@ public class BSPRunner {
   private TaskAttemptID id;
   private BSPPeerImpl<?, ?, ?, ?, ? extends Writable> peer;
   private Counters counters = new Counters();
-  
+
   @SuppressWarnings("rawtypes")
   Class<? extends BSP> bspClass;
 
@@ -87,14 +87,34 @@ public class BSPRunner {
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void startComputation() throws Exception {
     BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
+    // Throw the first exception and log the remaining.
+    Exception firstException = null;
     try {
       bspInstance.setup(peer);
       bspInstance.bsp(peer);
     } catch (Exception e) {
-      throw e;
+      LOG.error("Error occured while running bsp function.", e);
+      firstException = e;
     } finally {
-      bspInstance.cleanup(peer);
-      peer.close();
+      try {
+        bspInstance.cleanup(peer);
+      } catch (Exception e) {
+        LOG.error("Cleaning up after bsp function.", e);
+        if (firstException == null) {
+          firstException = e;
+        }
+      } finally {
+        try {
+          peer.close();
+        } catch (Exception e) {
+          LOG.error("Error closing the bsp peer", e);
+          if (firstException == null)
+            firstException = e;
+        } finally {
+          if (firstException != null)
+            throw firstException;
+        }
+      }
     }
   }