You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2012/08/05 13:07:49 UTC

svn commit: r1369551 [2/3] - in /hama/branches/HAMA-505-branch: conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.RecoverTaskAction;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.TaskStatus;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>AsyncRcvdMsgCheckpointImpl</code> Checkpoint service defines the fault
+ * tolerance strategy by checkpointing of messages sent across peers. On
+ * failure, all the tasks are restarted from the last superstep for which all
+ * the peers successfully checkpointed the messages.
+ * 
+ */
+public class AsyncRcvdMsgCheckpointImpl<M extends Writable> implements
+    BSPFaultTolerantService<M> {
+
+  private static final Log LOG = LogFactory
+      .getLog(AsyncRcvdMsgCheckpointImpl.class);
+
+  /**
+   * It is responsible to find the smallest superstep for which the
+   * checkpointing is done and then restart all the peers from that superstep.
+   */
+  private static class CheckpointMasterService implements
+      FaultTolerantMasterService {
+
+    private Configuration conf;
+    private TaskInProgress tasks[];
+    private BSPJobID jobId;
+    private int maxTaskAttempts;
+    private int currentAttemptId;
+    private MasterSyncClient masterSyncClient;
+    private TaskAllocationStrategy allocationStrategy;
+
+    /**
+     * Initializes the fault tolerance service at BSPMasters
+     * 
+     * @param jobId The identifier of the job.
+     * @param maxTaskAttempts Number of attempts allowed for recovering from
+     *          failure.
+     * @param tasks The list of tasks in the job.
+     * @param conf The job configuration object.
+     * @param masterClient The synchronization client used by BSPMaster.
+     * @param allocationStrategy The task allocation strategy of the job.
+     */
+    public void initialize(BSPJobID jobId, int maxTaskAttempts,
+        TaskInProgress[] tasks, Configuration conf,
+        MasterSyncClient masterClient, TaskAllocationStrategy allocationStrategy) {
+      this.tasks = tasks;
+      this.jobId = jobId;
+      this.conf = conf;
+      this.maxTaskAttempts = maxTaskAttempts;
+      this.currentAttemptId = 0;
+      this.masterSyncClient = masterClient;
+      this.allocationStrategy = allocationStrategy;
+    }
+
+    @Override
+    public boolean isRecoveryPossible(TaskInProgress tip) {
+      return currentAttemptId < maxTaskAttempts;
+    }
+
+    @Override
+    public boolean isAlreadyRecovered(TaskInProgress tip) {
+      return currentAttemptId < tip.getCurrentTaskAttemptId().getId();
+    }
+
+    @Override
+    public void recoverTasks(JobInProgress jip,
+        Map<String, GroomServerStatus> groomStatuses,
+        TaskInProgress[] failedTasksInProgress,
+        TaskInProgress[] allTasksInProgress,
+        Map<GroomServerStatus, Integer> taskCountInGroomMap,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+        throws IOException {
+
+      Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
+          2 * failedTasksInProgress.length);
+      for (int i = 0; i < failedTasksInProgress.length; ++i) {
+        recoverySet.put(failedTasksInProgress[i].getTaskId(),
+            failedTasksInProgress[i]);
+      }
+
+      long lowestSuperstepNumber = Long.MAX_VALUE;
+
+      String[] taskProgress = this.masterSyncClient.getChildKeySet(
+          this.masterSyncClient.constructKey(jobId, "checkpoint"), null);
+
+      if (LOG.isDebugEnabled()) {
+        StringBuffer list = new StringBuffer(25 * taskProgress.length);
+        list.append("got child key set").append(taskProgress.length)
+            .append("/").append(tasks.length).append(" ");
+        for (String entry : taskProgress) {
+          list.append(entry).append(",");
+        }
+        LOG.debug(list);
+      }
+
+      if (taskProgress.length == this.tasks.length) {
+        for (int i = 0; i < taskProgress.length; ++i) {
+          ArrayWritable progressInformation = new ArrayWritable(
+              LongWritable.class);
+          boolean result = this.masterSyncClient.getInformation(
+              this.masterSyncClient.constructKey(jobId, "checkpoint",
+                  taskProgress[i]), progressInformation);
+
+          if (!result) {
+            lowestSuperstepNumber = -1L;
+            break;
+          }
+
+          Writable[] progressArr = progressInformation.get();
+          LongWritable superstepProgress = (LongWritable) progressArr[0];
+
+          if (superstepProgress != null) {
+            if (superstepProgress.get() < lowestSuperstepNumber) {
+              lowestSuperstepNumber = superstepProgress.get();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Got superstep number " + lowestSuperstepNumber
+                    + " from " + taskProgress[i]);
+              }
+            }
+          }
+        }
+        clearClientForSuperstep(lowestSuperstepNumber);
+        restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
+            allTasksInProgress, taskCountInGroomMap, actionMap);
+
+      } else {
+        restartJob(-1, groomStatuses, recoverySet, allTasksInProgress,
+            taskCountInGroomMap, actionMap);
+      }
+
+      ++currentAttemptId;
+    }
+
+    private void clearClientForSuperstep(long superstep) {
+      this.masterSyncClient.remove(
+          masterSyncClient.constructKey(jobId, "sync"), null);
+    }
+
+    private void populateAction(Task task, long superstep,
+        GroomServerStatus groomStatus,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+      List<GroomServerAction> list = actionMap.get(groomStatus);
+      if (!actionMap.containsKey(groomStatus)) {
+        list = new ArrayList<GroomServerAction>();
+        actionMap.put(groomStatus, list);
+      }
+      list.add(new RecoverTaskAction(task, superstep));
+
+    }
+
+    private void restartTask(TaskInProgress tip, long superstep,
+        Map<String, GroomServerStatus> groomStatuses,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+      GroomServerStatus serverStatus = tip.getGroomServerStatus();
+      Task task = tip.constructTask(serverStatus);
+      populateAction(task, superstep, serverStatus, actionMap);
+
+    }
+
+    private void restartJob(long superstep,
+        Map<String, GroomServerStatus> groomStatuses,
+        Map<TaskID, TaskInProgress> recoveryMap, TaskInProgress[] allTasks,
+        Map<GroomServerStatus, Integer> taskCountInGroomMap,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+        throws IOException {
+      String path = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+
+      if (superstep >= 0) {
+        FileSystem fileSystem = FileSystem.get(conf);
+        for (int i = 0; i < allTasks.length; ++i) {
+          String[] hosts = null;
+          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+
+            // Update task count in map.
+            // TODO: This should be a responsibility of GroomServerStatus
+            Integer count = taskCountInGroomMap.get(allTasks[i]
+                .getGroomServerStatus());
+            if (count != null) {
+              count = count.intValue() - 1;
+              taskCountInGroomMap
+                  .put(allTasks[i].getGroomServerStatus(), count);
+            }
+
+            StringBuffer ckptPath = new StringBuffer(path);
+            ckptPath.append(this.jobId.toString());
+            ckptPath.append("/").append(superstep).append("/")
+                .append(allTasks[i].getTaskId().getId());
+            Path checkpointPath = new Path(ckptPath.toString());
+            if (fileSystem.exists(checkpointPath)) {
+              FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+              BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+                  fileStatus, 0, fileStatus.getLen());
+              hosts = blocks[0].getHosts();
+            } else {
+              hosts = new String[groomStatuses.keySet().size()];
+              groomStatuses.keySet().toArray(hosts);
+            }
+            GroomServerStatus serverStatus = this.allocationStrategy
+                .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+                    new BSPResource[0], allTasks[i]);
+            Task task = allTasks[i].constructTask(serverStatus);
+            populateAction(task, superstep, serverStatus, actionMap);
+
+          } else {
+            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+          }
+        }
+      } else {
+        // Start the task from the beginning.
+        for (int i = 0; i < allTasks.length; ++i) {
+          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+            this.allocationStrategy.getGroomToAllocate(groomStatuses,
+                this.allocationStrategy.selectGrooms(groomStatuses,
+                    taskCountInGroomMap, new BSPResource[0], allTasks[i]),
+                taskCountInGroomMap, new BSPResource[0], allTasks[i]);
+          } else {
+            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+          }
+        }
+      }
+    }
+
+  }// end of CheckpointMasterService
+
+  @Override
+  public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspPeer, PeerSyncClient syncClient,
+      InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+      long superstep, Configuration conf, MessageManager<M> messenger)
+      throws Exception {
+    CheckpointPeerService<M> service = new CheckpointPeerService<M>();
+    service.initialize(job, bspPeer, syncClient, peerAddress, taskAttemptId,
+        superstep, conf, messenger);
+    return service;
+  }
+
+  @Override
+  public FaultTolerantMasterService constructMasterFaultTolerance(
+      BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+      Configuration conf, MasterSyncClient masterClient,
+      TaskAllocationStrategy allocationStrategy) throws Exception {
+    CheckpointMasterService service = new CheckpointMasterService();
+    service.initialize(jobId, maxTaskAttempts, tasks, conf, masterClient,
+        allocationStrategy);
+    return service;
+  }
+
+  /**
+   * Initializes the peer fault tolerance by checkpointing service. For
+   * recovery, on peer initialization, it reads all the checkpointed messages to
+   * recover the state of the peer. During normal working, it checkpoints all
+   * the messages it received in the previous superstep. It also stores the
+   * superstep progress in the global synchronization area.
+   * 
+   */
+  public static class CheckpointPeerService<M extends Writable> implements
+      FaultTolerantPeerService<M>, MessageEventListener<M> {
+
+    private BSPJob job;
+    @SuppressWarnings("rawtypes")
+    private BSPPeer peer;
+    private PeerSyncClient syncClient;
+    private long superstep;
+    private Configuration conf;
+    private MessageManager<M> messenger;
+    private FileSystem fs;
+    private int checkPointInterval;
+    volatile private long lastCheckPointStep;
+    volatile private boolean checkpointState;
+    volatile private FSDataOutputStream checkpointStream;
+    volatile private long checkpointMessageCount;
+
+    public void initialize(BSPJob job, @SuppressWarnings("rawtypes")
+    BSPPeer bspPeer, PeerSyncClient syncClient, InetSocketAddress peerAddress,
+        TaskAttemptID taskAttemptId, long superstep, Configuration conf,
+        MessageManager<M> messenger) throws IOException {
+
+      this.job = job;
+      this.peer = bspPeer;
+      this.syncClient = syncClient;
+      this.superstep = superstep;
+      this.conf = conf;
+      this.messenger = messenger;
+      this.fs = FileSystem.get(conf);
+      this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+          Constants.DEFAULT_CHECKPOINT_INTERVAL);
+      this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+          Constants.DEFAULT_CHECKPOINT_INTERVAL);
+
+      this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED,
+          false);
+
+      if (superstep > 0) {
+        this.lastCheckPointStep = this.superstep;
+      } else {
+        this.lastCheckPointStep = 1;
+      }
+      this.checkpointMessageCount = 0L;
+    }
+
+    private String checkpointPath(long step) {
+      String backup = conf.get("bsp.checkpoint.prefix_path", "checkpoint/");
+      String ckptPath = backup + job.getJobID().toString() + "/" + (step) + "/"
+          + peer.getPeerIndex();
+      if (LOG.isDebugEnabled())
+        LOG.debug("Received Messages are to be saved to " + ckptPath);
+      return ckptPath;
+    }
+
+    @Override
+    public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+        throws Exception {
+      if (this.superstep >= 0 && state.equals(TaskStatus.State.RECOVERING)) {
+        ArrayWritable progressArr = new ArrayWritable(LongWritable.class);
+        boolean result = this.syncClient.getInformation(
+            this.syncClient.constructKey(job.getJobID(), "checkpoint",
+                String.valueOf(peer.getPeerIndex())), progressArr);
+
+        if (!result) {
+          throw new IOException("No data found to restore peer state.");
+        }
+
+        Writable[] progressInfo = progressArr.get();
+        long superstepProgress = ((LongWritable) progressInfo[0]).get();
+        long numMessages = ((LongWritable) progressInfo[1]).get();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got sstep =" + superstepProgress + " numMessages = "
+              + numMessages + " this.superstep = " + this.superstep);
+        }
+
+        if (numMessages > 0) {
+          Path path = new Path(checkpointPath(superstepProgress));
+          FSDataInputStream in = this.fs.open(path);
+          BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+          try {
+            for (int i = 0; i < numMessages; ++i) {
+              String className = in.readUTF();
+              @SuppressWarnings("unchecked")
+              M message = (M) ReflectionUtils.newInstance(
+                  Class.forName(className), conf);
+              message.readFields(in);
+              bundle.addMessage(message);
+            }
+            messenger.loopBackMessages(bundle);
+          } catch (EOFException e) {
+            LOG.error("Error recovering from checkpointing", e);
+            throw new IOException(e);
+          } finally {
+            this.fs.close();
+          }
+        }
+      }
+      this.messenger.registerListener(this);
+      return TaskStatus.State.RUNNING;
+
+    }
+
+    public final boolean isReadyToCheckpoint() {
+
+      checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+      LOG.info(new StringBuffer(1000).append("Enabled = ")
+          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+          .append(" checkPointInterval = ").append(checkPointInterval)
+          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+          .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+          .toString());
+      if (LOG.isDebugEnabled())
+        LOG.debug(new StringBuffer(1000).append("Enabled = ")
+            .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+            .append(" checkPointInterval = ").append(checkPointInterval)
+            .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+            .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+            .toString());
+
+      return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+          && (checkPointInterval != 0) && (((int) ((peer.getSuperstepCount() + 1) - lastCheckPointStep)) >= checkPointInterval));
+
+    }
+
+    @Override
+    public void beforeBarrier() throws Exception {
+    }
+
+    @Override
+    public void duringBarrier() throws Exception {
+    }
+
+    @Override
+    public void afterBarrier() throws Exception {
+
+      synchronized (this) {
+        if (checkpointState) {
+
+          if (checkpointStream != null) {
+            this.checkpointStream.close();
+            this.checkpointStream = null;
+          }
+
+          lastCheckPointStep = peer.getSuperstepCount();
+
+          ArrayWritable writableArray = new ArrayWritable(LongWritable.class);
+          Writable[] writeArr = new Writable[2];
+          writeArr[0] = new LongWritable(lastCheckPointStep);
+          writeArr[1] = new LongWritable(checkpointMessageCount);
+          writableArray.set(writeArr);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Storing lastCheckPointStep = " + lastCheckPointStep
+                + " checkpointMessageCount = " + checkpointMessageCount
+                + " for peer = " + String.valueOf(peer.getPeerIndex()));
+          }
+
+          this.syncClient.storeInformation(this.syncClient.constructKey(
+              this.job.getJobID(), "checkpoint",
+              String.valueOf(peer.getPeerIndex())), writableArray, true, null);
+        }
+        checkpointState = isReadyToCheckpoint();
+        checkpointMessageCount = 0;
+      }
+
+      LOG.info("checkpoingNext = " + checkpointState
+          + " checkpointMessageCount = " + checkpointMessageCount);
+    }
+
+    @Override
+    public void onInitialized() {
+
+    }
+
+    @Override
+    public void onMessageSent(String peerName, M message) {
+    }
+
+    @Override
+    public void onMessageReceived(M message) {
+      String checkpointedPath = null;
+
+      if (message == null) {
+        LOG.error("Message M is found to be null");
+      }
+
+      synchronized (this) {
+        if (checkpointState) {
+          if (this.checkpointStream == null) {
+            checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+            try {
+              LOG.info("Creating path " + checkpointedPath);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating path " + checkpointedPath);
+              }
+              checkpointStream = this.fs.create(new Path(checkpointedPath));
+            } catch (IOException ioe) {
+              LOG.error("Fail checkpointing messages to " + checkpointedPath,
+                  ioe);
+              throw new RuntimeException("Failed opening HDFS file "
+                  + checkpointedPath, ioe);
+            }
+          }
+          try {
+            ++checkpointMessageCount;
+            checkpointStream.writeUTF(message.getClass().getCanonicalName());
+            message.write(checkpointStream);
+          } catch (IOException ioe) {
+            LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe);
+            throw new RuntimeException("Failed writing to HDFS file "
+                + checkpointedPath, ioe);
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("message count = " + checkpointMessageCount);
+          }
+        }
+      }
+
+    }
+
+    @Override
+    public void onClose() {
+
+    }
+
+  }
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>BSPFaultTolerantService</code> defines the fault tolerance service
+ * behavior. The fault tolerance service is a feature of a running job and not
+ * the system. A class defined on this behavior has the responsibility to create
+ * two objects. The first object <code>FaultTolerantMasterService</code> is
+ * used by the job at BSPMaster to handle fault tolerance related steps at the
+ * master. The second object <code>FaultTolerantPeerService</code> is used to
+ * define the behavior of object that would implement the fault tolerance
+ * related steps for recovery inside <code>BSPPeer</code> (in each of the BSP
+ * peers doing computations)
+ */
+public interface BSPFaultTolerantService<M extends Writable> {
+  
+  /**
+   * The token by which a job can register its fault-tolerance service.
+   */
+  public static final String FT_SERVICE_CONF = "hama.ft.conf.class";
+
+  /**
+   * Creates the instance of <code>FaultTolerantMasterService</code> that would
+   * handle fault-tolerance related steps at BSPMaster task scheduler.
+   * 
+   * @param jobId The identifier of the job.
+   * @param maxTaskAttempts Number of attempts allowed for recovering from
+   *          failure.
+   * @param tasks The list of tasks in the job.
+   * @param conf The job configuration object.
+   * @param masterClient The synchronization client used by BSPMaster.
+   * @param allocationStrategy The task allocation strategy of the job.
+   * @return An instance of class inheriting
+   *         <code>FaultTolerantMasterService</code>
+   */
+  public FaultTolerantMasterService constructMasterFaultTolerance(
+      BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+      Configuration conf, MasterSyncClient masterClient,
+      TaskAllocationStrategy allocationStrategy) throws Exception;
+
+  /**
+   * Creates an instance of <code>FaultTolerantPeerService</code> which defines
+   * the steps that has to be taken inside a peer for fault-tolerance.
+   * 
+   * @param bspPeer The peer
+   * @param syncClient The synchronization client used by peer.
+   * @param superstep The superstep from which the peer is initialized.
+   * @param conf job configuration object
+   * @param messenger The messaging system between the peers
+   * @return An instance of class inheriting
+   *         <code>FaultTolerantPeerService</code>
+   */
+  public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspPeer, PeerSyncClient syncClient,
+      InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+      long superstep, Configuration conf, MessageManager<M> messenger)
+      throws Exception;
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>FaultTolerantMasterService</code> defines the behavior of object
+ * responsible for doing fault-tolerance related work on BSPMaster task
+ * scheduler. This is defined per job.
+ */
+public interface FaultTolerantMasterService {
+
+  /**
+   * Returns true if recovery of the task in question is possible.
+   * 
+   * @param tip <code>TaskInProgress</code> object that represents the task.
+   * @return true if recovery is possible.
+   */
+  public boolean isRecoveryPossible(TaskInProgress tip);
+
+  /**
+   * Returns true if the task is already slated to be recovered for failure.
+   * 
+   * @param tip <code>TaskInProgress</code> object that represents the task.
+   * @return if task/job is already in process of recovery.
+   */
+  public boolean isAlreadyRecovered(TaskInProgress tip);
+
+  /**
+   * From the list of tasks that are failed, provide the task scheduler a set of
+   * actions and the grooms to which these actions must be sent for fault
+   * recovery.
+   * 
+   * @param jip The job in question which has to be recovered.
+   * @param groomStatuses The map of grooms to their statuses.
+   * @param failedTasksInProgress The list of failed tasks.
+   * @param allTasksInProgress This list of all tasks in the job.
+   * @param actionMap The map of groom to the list of actions that are to be
+   *          taken on that groom.
+   */
+  public void recoverTasks(JobInProgress jip,
+      Map<String, GroomServerStatus> groomStatuses,
+      TaskInProgress[] failedTasksInProgress,
+      TaskInProgress[] allTasksInProgress,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+      throws IOException;
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskStatus;
+
+/**
+ * <code>FaultTolerantPeerService</code> defines the steps required to be
+ * performed by peers for fault-tolerance. At different stages of peer
+ * execution, the service can take necessary measures to ensure that the peer
+ * computations could be recovered if any of them failed.
+ */
+public interface FaultTolerantPeerService<M extends Writable> {
+
+  /**
+   * This is called once the peer is initialized.
+   * 
+   * @throws Exception
+   */
+  public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+      throws Exception;
+
+  /**
+   * This function is called before all the peers go into global sync/
+   * 
+   * @throws Exception
+   */
+  public void beforeBarrier() throws Exception;
+
+  /**
+   * This functions is called after the peers enter the barrier but before they
+   * initate leaving the barrier.
+   * 
+   * @throws Exception
+   */
+  public void duringBarrier() throws Exception;
+
+  /**
+   * This function is called every time the peer completes the global
+   * synchronization.
+   * 
+   * @throws Exception
+   */
+  public void afterBarrier() throws Exception;
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sun Aug  5 11:07:48 2012
@@ -22,7 +22,9 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
+import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -61,6 +64,9 @@ public abstract class AbstractMessageMan
   // the task attempt id
   protected TaskAttemptID attemptId;
 
+  // List of listeners for all the sent messages
+  protected Queue<MessageEventListener<M>> messageListenerQueue;
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -70,12 +76,14 @@ public abstract class AbstractMessageMan
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
+    this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
     this.attemptId = attemptId;
     this.peer = peer;
     this.conf = conf;
     this.peerAddress = peerAddress;
     localQueue = getQueue();
     localQueueForNextIteration = getSynchronizedQueue();
+    
   }
 
   /*
@@ -84,18 +92,22 @@ public abstract class AbstractMessageMan
    */
   @Override
   public void close() {
-    Collection<MessageQueue<M>> values = outgoingQueues.values();
-    for (MessageQueue<M> msgQueue : values) {
-      msgQueue.close();
-    }
-    localQueue.close();
-    // remove possible disk queues from the path
     try {
-      FileSystem.get(conf).delete(
-          DiskQueue.getQueueDir(conf, attemptId,
-              conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
-    } catch (IOException e) {
-      LOG.warn("Queue dir couldn't be deleted");
+      Collection<MessageQueue<M>> values = outgoingQueues.values();
+      for (MessageQueue<M> msgQueue : values) {
+        msgQueue.close();
+      }
+      localQueue.close();
+      // remove possible disk queues from the path
+      try {
+        FileSystem.get(conf).delete(
+            DiskQueue.getQueueDir(conf, attemptId,
+                conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+      } catch (IOException e) {
+        LOG.warn("Queue dir couldn't be deleted");
+      }
+    } finally {
+      notifyClose();
     }
 
   }
@@ -139,6 +151,7 @@ public abstract class AbstractMessageMan
     localQueue = localQueueForNextIteration.getMessageQueue();
     localQueue.prepareRead();
     localQueueForNextIteration = getSynchronizedQueue();
+    notifyInit();
   }
 
   /*
@@ -163,6 +176,7 @@ public abstract class AbstractMessageMan
     queue.add(msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
     outgoingQueues.put(targetPeerAddress, queue);
+    notifySentMessage(peerName, msg);
   }
 
   /*
@@ -206,4 +220,68 @@ public abstract class AbstractMessageMan
     this.conf = conf;
   }
 
+  private void notifySentMessage(String peerName, M message) {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageSent(peerName, message);
+    }
+  }
+
+  private void notifyReceivedMessage(M message) throws IOException {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageReceived(message);
+    }
+  }
+
+  private void notifyInit() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onInitialized();
+    }
+  }
+
+  private void notifyClose() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onClose();
+    }
+  }
+
+  
+
+  @Override
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException {
+    if(listener != null)
+      this.messageListenerQueue.add(listener);
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException{
+    for (Writable message : bundle.getMessages()) {
+      loopBackMessage((M)message);
+    }
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessage(Writable message) throws IOException{
+    this.localQueueForNextIteration.add((M)message);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+    notifyReceivedMessage((M)message);
+    
+  }
+  
+  
+  
+  
+
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sun Aug  5 11:07:48 2012
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -61,14 +60,8 @@ public final class AvroMessageManagerImp
     server.close();
   }
 
-  public void put(BSPMessageBundle<M> messages) {
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-        messages.getMessages().size());
-    Iterator<M> iterator = messages.getMessages().iterator();
-    while (iterator.hasNext()) {
-      this.localQueueForNextIteration.add(iterator.next());
-      iterator.remove();
-    }
+  public void put(BSPMessageBundle<M> messages) throws IOException {
+    this.loopBackMessages(messages);
   }
 
   @SuppressWarnings("unchecked")
@@ -139,5 +132,4 @@ public final class AvroMessageManagerImp
       return ByteBuffer.wrap(data);
     }
   }
-
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java Sun Aug  5 11:07:48 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
@@ -35,7 +37,7 @@ public interface HadoopMessageManager<M 
    * 
    * @param msg
    */
-  public void put(M msg);
+  public void put(M msg) throws IOException;
 
   /**
    * This method puts a messagebundle for the next iteration. Accessed
@@ -43,7 +45,7 @@ public interface HadoopMessageManager<M 
    * 
    * @param messages
    */
-  public void put(BSPMessageBundle<M> messages);
+  public void put(BSPMessageBundle<M> messages) throws IOException;
 
   /**
    * This method puts a compressed message bundle for the next iteration.
@@ -51,6 +53,6 @@ public interface HadoopMessageManager<M 
    * 
    * @param compMsgBundle
    */
-  public void put(BSPCompressedBundle compMsgBundle);
+  public void put(BSPCompressedBundle compMsgBundle) throws IOException;
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sun Aug  5 11:07:48 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -115,24 +114,19 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final void put(M msg) {
-    this.localQueueForNextIteration.add(msg);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+  public final void put(M msg) throws IOException {
+    loopBackMessage(msg);
   }
 
   @Override
-  public final void put(BSPMessageBundle<M> messages) {
-    for (M message : messages.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+  public final void put(BSPMessageBundle<M> messages) throws IOException {
+    loopBackMessages(messages);
   }
 
   @Override
-  public final void put(BSPCompressedBundle compMsgBundle) {
+  public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
     BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
-    for (M message : bundle.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+    loopBackMessages(bundle);
   }
 
   @Override
@@ -141,4 +135,5 @@ public final class HadoopMessageManagerI
     return versionID;
   }
 
+  
 }

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface MessageEventListener<M> {
+
+  /**
+   * 
+   *
+   */
+  public static enum MessageManagerEvent {
+    INITIALIZED, MESSAGE_SENT, MESSAGE_RECEIVED, CLOSE
+  }
+
+  /**
+   * The function to handle the event when the queue is initialized.
+   */
+  void onInitialized();
+
+  /**
+   * The function to handle the event when a message is sent.
+   * <code>message</code> should not be modified.
+   * 
+   * @param peerName Name of the peer to be sent.
+   * @param message The message set.
+   */
+  void onMessageSent(String peerName, final M message);
+
+  /**
+   * The function to handle the event when a message is received.
+   * <code>message</code> should not be modified.
+   * 
+   * @param message The message received.
+   */
+  void onMessageReceived(final M message);
+
+  /**
+   * The function to handle the event when the queue is closed.
+   */
+  void onClose();
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sun Aug  5 11:07:48 2012
@@ -34,7 +34,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * 
  */
 public interface MessageManager<M extends Writable> {
-  
+
   public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
 
   /**
@@ -96,4 +96,25 @@ public interface MessageManager<M extend
    */
   public int getNumCurrentMessages();
 
+  /**
+   * Send the messages to self to receive in the next superstep.
+   */
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException;
+  
+  /**
+   * Send the message to self to receive in the next superstep.
+   */
+  public void loopBackMessage(Writable message) throws IOException;
+
+  /**
+   * Register a listener for the events in message manager.
+   * 
+   * @param listener <code>MessageEventListener</code> object that processes the
+   *          messages sent to remote peer.
+   * @throws IOException
+   */
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException;
+  
+
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Sun Aug  5 11:07:48 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.bsp.BSPJobID;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -100,7 +102,7 @@ public abstract class BSPPeerSyncClient 
    * 
    * @throws InterruptedException
    */
-  public abstract void close() throws InterruptedException;
+  public abstract void close() throws IOException;
 
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java Sun Aug  5 11:07:48 2012
@@ -31,28 +31,23 @@ public interface MasterSyncClient extend
    * 
    * @param conf The configuration parameters to initialize the client.
    */
-  public abstract void init(HamaConfiguration conf);
+  public void init(HamaConfiguration conf);
   
   /**
    * Clears all information stored.
    */
-  public abstract void clear();
+  public void clear();
   
   /**
    * Register a newly added job 
    * @param string
    */
-  public abstract void registerJob(String string);
+  public void registerJob(String string);
 
   /**
    * Deregister the job from the system.
    * @param string
    */
-  public abstract void deregisterJob(String string);
+  public void deregisterJob(String string);
     
-  /**
-   * Closes the client.
-   */
-  public abstract void close();
-
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Sun Aug  5 11:07:48 2012
@@ -100,12 +100,4 @@ public interface PeerSyncClient extends 
    */
   public void stopServer();
 
-  /**
-   * This method should close all used resources, e.G. a ZooKeeper instance.
-   * 
-   * @throws InterruptedException
-   */
-  public void close() throws InterruptedException;
-
-
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Sun Aug  5 11:07:48 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPJobID;
 
@@ -56,7 +58,7 @@ public interface SyncClient {
    * @return the value if found. Returns null if there was any error of if there
    * was no value stored for the key.
    */
-  public Writable getInformation(String key, Class<? extends Writable> classType);
+  public boolean getInformation(String key, Writable valueHolder);
 
   /**
    * Store new key in key set.
@@ -95,4 +97,17 @@ public interface SyncClient {
   public boolean registerListener(String key, SyncEvent event,
       SyncEventListener listener);
 
+  /**
+   * Delete the key and the information stored under it.
+   * @param key
+   * @param listener
+   * @return
+   */
+  public boolean remove(String key, SyncEventListener listener);
+  
+  /**
+   * 
+   */
+  public void close() throws IOException;
+  
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java Sun Aug  5 11:07:48 2012
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp.sync;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,45 +80,6 @@ public class ZKSyncBSPMasterClient exten
 
   }
 
-  /**
-   * Clears all sub-children of node bspRoot
-   */
-  private void clearZKNodes() {
-    try {
-      Stat s = zk.exists(bspRoot, false);
-      if (s != null) {
-        clearZKNodes(bspRoot);
-      }
-
-    } catch (Exception e) {
-      LOG.warn("Could not clear zookeeper nodes.", e);
-    }
-  }
-
-  /**
-   * Clears all sub-children of node rooted at path.
-   * 
-   * @param path
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  private void clearZKNodes(String path) throws KeeperException,
-      InterruptedException {
-    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
-
-    if (list.size() == 0) {
-      return;
-
-    } else {
-      for (String node : list) {
-        clearZKNodes(path + "/" + node);
-        LOG.info("Deleting " + path + "/" + node);
-        zk.delete(path + "/" + node, -1); // delete any version of this
-        // node.
-      }
-    }
-  }
-
   private void createJobRoot(String string) {
     writeNode(string, null, true, null);
   }
@@ -164,4 +124,9 @@ public class ZKSyncBSPMasterClient exten
     LOG.debug("Processing event type " + arg0.getType().toString());
 
   }
+
+  public ZooKeeper getZK() {
+    return this.zk;
+  }
+
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java Sun Aug  5 11:07:48 2012
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,9 +32,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPJobID;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.ReflectionUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
@@ -77,11 +78,14 @@ public abstract class ZKSyncClient imple
    *         created.
    */
   protected String getNodeName(TaskAttemptID taskId, long superstep) {
-    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
-        + taskId.toString();
+    return constructKey(taskId.getJobID(), "sync", "" + superstep,
+        taskId.toString());
+    //
+    // bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+    // + taskId.toString();
   }
-  
-  private String correctKey(String key){
+
+  private String correctKey(String key) {
     if (!key.startsWith("/")) {
       key = "/" + key;
     }
@@ -114,7 +118,7 @@ public abstract class ZKSyncClient imple
    * @throws InterruptedException
    */
   protected Stat getStat(final String path) throws KeeperException,
-  InterruptedException {
+      InterruptedException {
     synchronized (zk) {
       return zk.exists(path, false);
     }
@@ -125,10 +129,17 @@ public abstract class ZKSyncClient imple
       InterruptedException {
 
     synchronized (zk) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Checking node " + path);
+      }
       Stat s = zk.exists(path, false);
       if (null == s) {
         try {
           zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Created node " + path);
+          }
+
         } catch (KeeperException.NodeExistsException nee) {
           LOG.debug("Ignore because znode may be already created at " + path,
               nee);
@@ -170,30 +181,27 @@ public abstract class ZKSyncClient imple
 
   /**
    * Utility function to read Writable object value from byte array.
+   * 
    * @param data The byte array
    * @param classType The Class object of expected Writable object.
-   * @return The instance of Writable object. 
+   * @return The instance of Writable object.
    * @throws IOException
    */
-  protected Writable getValueFromBytes(
-      byte[] data, Class<? extends Writable> classType) throws IOException{
-    Writable value = null;
+  protected boolean getValueFromBytes(byte[] data,
+      Writable valueHolder) throws IOException {
     if (data != null) {
       ByteArrayInputStream istream = new ByteArrayInputStream(data);
-      value = ReflectionUtils
-          .newInstance(classType, new Object[0]);
       DataInputStream diStream = new DataInputStream(istream);
       try {
-        value.readFields(diStream);
-      }
-      finally {
+        valueHolder.readFields(diStream);
+      } finally {
         diStream.close();
       }
+      return true;
     }
-    return value;
+    return false;
   }
 
-
   /**
    * Read value stored in the Zookeeper node.
    * 
@@ -202,23 +210,21 @@ public abstract class ZKSyncClient imple
    * @return The Writable object constructed from the value read from the
    *         Zookeeper node.
    */
-  protected Writable extractData(String path,
-      Class<? extends Writable> classType) {
+  protected boolean extractData(String path,
+      Writable valueHolder) {
     try {
       Stat stat = getStat(path);
       if (stat != null) {
         byte[] data = this.zk.getData(path, false, stat);
-        Writable value = null;
-        try{
-          value = getValueFromBytes(data, classType);
-        }
-        catch (IOException e) {
+        try {
+          getValueFromBytes(data, valueHolder);
+        } catch (IOException e) {
           LOG.error(
               new StringBuffer(200).append("Error getting data from path ")
-              .append(path).toString(), e);
-          value = null;
+                  .append(path).toString(), e);
+          return false;
         }
-        return value;
+        return true;
       }
 
     } catch (KeeperException e) {
@@ -230,7 +236,7 @@ public abstract class ZKSyncClient imple
           .append(path).toString(), e);
 
     }
-    return null;
+    return false;
 
   }
 
@@ -277,7 +283,7 @@ public abstract class ZKSyncClient imple
               watcher);
         }
         pathBuffer.append("/")
-        .append(pathComponents[pathComponents.length - 1]);
+            .append(pathComponents[pathComponents.length - 1]);
         CreateMode mode = CreateMode.EPHEMERAL;
         if (persistent) {
           mode = CreateMode.PERSISTENT;
@@ -331,10 +337,10 @@ public abstract class ZKSyncClient imple
   }
 
   @Override
-  public Writable getInformation(String key, Class<? extends Writable> classType) {
+  public boolean getInformation(String key, Writable valueHolder) {
     key = correctKey(key);
     final String path = key;
-    return extractData(path, classType);
+    return extractData(path, valueHolder);
   }
 
   @Override
@@ -426,7 +432,74 @@ public abstract class ZKSyncClient imple
     return children;
   }
 
-  
-  
+  /**
+   * Clears all sub-children of node bspRoot
+   */
+  protected void clearZKNodes() {
+    try {
+      Stat s = zk.exists(bspRoot, false);
+      if (s != null) {
+        clearZKNodes(bspRoot);
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Could not clear zookeeper nodes.", e);
+    }
+  }
+
+  /**
+   * Clears all sub-children of node rooted at path.
+   * 
+   * @param path
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  protected void clearZKNodes(String path) throws KeeperException,
+      InterruptedException {
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+    if (list.size() == 0) {
+      return;
+
+    } else {
+      for (String node : list) {
+        clearZKNodes(path + "/" + node);
+        LOG.info("Deleting " + path + "/" + node);
+        zk.delete(path + "/" + node, -1); // delete any version of this
+        // node.
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent arg0) {
+  }
+
+  @Override
+  public boolean remove(String key, SyncEventListener listener) {
+    key = correctKey(key);
+    try {
+      clearZKNodes(key);
+      this.zk.delete(key, -1);
+      return true;
+    } catch (KeeperException e) {
+      LOG.error("Error deleting key " + key);
+    } catch (InterruptedException e) {
+      LOG.error("Error deleting key " + key);
+    }
+    return false;
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    try {
+      this.zk.close();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+  }
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Sun Aug  5 11:07:48 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
@@ -91,9 +92,10 @@ public class ZooKeeperSyncClientImpl ext
 
     try {
       synchronized (zk) {
-        final String pathToJobIdZnode = bspRoot + "/"
-            + taskId.getJobID().toString();
-        final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
+
+        final String pathToSuperstepZnode = 
+            constructKey(taskId.getJobID(), "sync", ""+superstep);
+        
         writeNode(pathToSuperstepZnode, null, true, null);
         BarrierWatcher barrierWatcher = new BarrierWatcher();
         // this is really needed to register the barrier watcher, don't remove
@@ -140,8 +142,10 @@ public class ZooKeeperSyncClientImpl ext
   public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
       final long superstep) throws SyncException {
     try {
-      final String pathToSuperstepZnode = bspRoot + "/"
-          + taskId.getJobID().toString() + "/" + superstep;
+//      final String pathToSuperstepZnode = bspRoot + "/"
+//          + taskId.getJobID().toString() + "/" + superstep;
+      final String pathToSuperstepZnode = 
+          constructKey(taskId.getJobID(), "sync", ""+superstep);
       while (true) {
         List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
         LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
@@ -278,11 +282,16 @@ public class ZooKeeperSyncClientImpl ext
         for (String s : allPeers) {
           byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
               this, null);
-          TaskAttemptID thatTask = (TaskAttemptID) getValueFromBytes(data,
-              TaskAttemptID.class);
-          LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
-              + thatTask.getTaskID().getId() + " : " + s);
-          sortedMap.put(thatTask.getTaskID().getId(), s);
+          TaskAttemptID thatTask = new TaskAttemptID(); 
+          boolean result = getValueFromBytes(data, thatTask);
+
+          if(result){
+            LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+                + thatTask.getTaskID().getId() + " : " + s);
+            sortedMap.put(thatTask.getTaskID().getId(), s);
+          }
+
+
         }
 
       } catch (Exception e) {
@@ -303,8 +312,13 @@ public class ZooKeeperSyncClientImpl ext
   }
 
   @Override
-  public void close() throws InterruptedException {
-    zk.close();
+  public void close() throws IOException {
+    try{
+      zk.close();
+    }
+    catch(InterruptedException e){
+      throw new IOException(e);
+    }
   }
 
   @Override

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BSPResource defines a resource entity that would be used as a factor
+ * for allocating tasks on groom-servers.
+ */
+public abstract class BSPResource {
+
+  /**
+   * Returns the list of grooms on which the current resource is available or
+   * local or is best chosen for the task.
+   * 
+   * @param tip The <code>TaskInProgress</code> representing the task to
+   *          schedule.
+   * @return The list of groomserver host names.
+   */
+  public abstract String[] getGrooms(TaskInProgress tip);
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BestEffortDataLocalTaskAllocator</code> is a simple task allocator that
+ * takes in only the data locality as a constraint for allocating tasks. It
+ * makes the best attempt to schedule task on the groom server with the input
+ * split. If the aforesaid is not possible, it selects any other available groom
+ * to allocate tasks on.
+ */
+public class BestEffortDataLocalTaskAllocator implements TaskAllocationStrategy {
+
+  Log LOG = LogFactory.getLog(BestEffortDataLocalTaskAllocator.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  /**
+   * Returns the first groom that has a slot to schedule a task on.
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @return
+   */
+  private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+    Iterator<String> groomIter = grooms.keySet().iterator();
+    while (groomIter.hasNext()) {
+      GroomServerStatus groom = grooms.get(groomIter.next());
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * From the set of grooms given, returns the groom on which a task could be
+   * scheduled on.
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @param possibleLocations
+   * @return
+   */
+  private String getGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
+
+    for (int i = 0; i < possibleLocations.length; ++i) {
+      String location = possibleLocations[i];
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask())
+      return null;
+
+    String groomName = null;
+    if (selectedGrooms != null) {
+      groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap,
+          selectedGrooms);
+    }
+
+    if (groomName == null) {
+      groomName = getAnyGroomToSchedule(groomStatuses, taskCountInGroomMap);
+    }
+
+    if (groomName != null) {
+      return groomStatuses.get(groomName);
+    }
+
+    return null;
+  }
+
+  /**
+   * Select grooms that has the block of data locally stored on the groom
+   * server.
+   */
+  @Override
+  public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot start task based on id");
+      }
+      return new String[0];
+    }
+
+    RawSplit rawSplit = taskInProgress.getFileSplit();
+    if (rawSplit != null) {
+      return rawSplit.getLocations();
+    }
+    return null;
+  }
+
+  /**
+   * This operation is not supported.
+   */
+  @Override
+  public Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    throw new UnsupportedOperationException(
+        "This API is not supported for the called API function call.");
+  }
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>RawSplitResource</code> defines the data block resource that could be
+ * used to find which groom to schedule for data-locality. 
+ */
+public class RawSplitResource extends BSPResource{
+
+  private RawSplit split;
+  
+  public RawSplitResource(){
+    
+  }
+  
+  /**
+   * Initialize the resource with data block split information.
+   * @param split The data-split provided by <code>BSPJobClient</client>
+   */
+  public RawSplitResource(RawSplit split){
+    this.split = split;
+  }
+  
+  @Override
+  public String[] getGrooms(TaskInProgress tip) {
+    return split.getLocations();
+  }
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>TaskAllocationStrategy</class> defines the behavior of task allocation
+ * strategy that is employed by a Hama job to schedule tasks in GroomServers in 
+ * Hama cluster. The function <code>selectGrooms</code> is responsible to define
+ * the strategy to select the grooms. This list of grooms could be used in the
+ * functions <code>getGroomToAllocate</code> or <code>getGroomsToAllocate</code>
+ * in the parameter <code>selectedGrooms</code>. The two functions are not given
+ * the responsibility to select grooms because these functions could also be
+ * handling the task of allocating tasks on any other restricted set of grooms
+ * that the caller invokes them for.
+ * 
+ */
+
+@Unstable
+public interface TaskAllocationStrategy {
+
+  /**
+   * Initializes the <code>TaskAllocationStrategy</code> instance.
+   * 
+   * @param conf Hama configuration
+   */
+  public abstract void initialize(Configuration conf);
+
+  /**
+   * Defines the task-allocation strategy to select the grooms based on the
+   * resource constraints and the task related restrictions posed. This function
+   * could be used to populate the <code>selectedGrooms</code> argument in the
+   * functions <code>getGroomToAllocate</code> and
+   * <code>getGroomsToAllocate</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return An array of hostnames where the tasks could be allocated on.
+   */
+  @Unstable
+  public abstract String[] selectGrooms(
+      Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+
+  /**
+   * Returns the best groom to run the task on based on the set of grooms
+   * provided. The designer of the class can choose to populate the
+   * <code>selectedGrooms</code> value with the function
+   * <code>selectGrooms</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param selectedGrooms An array of selected groom host-names to select from.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return Host Name of the selected groom. Returns null if no groom could be
+   *         found.
+   */
+  @Unstable
+  public abstract GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+
+  /**
+   * Returns the best grooms to run the task on based on the set of grooms
+   * provided. The designer of the class can choose to populate the
+   * <code>selectedGrooms</code> value with the function
+   * <code>selectGrooms</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param selectedGrooms An array of selected groom host-names to select from.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return Host Names of the selected grooms where the task could be
+   *         allocated. Returns null if no groom could be found.
+   */
+  @Unstable
+  public abstract Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+}