You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2012/08/05 13:07:49 UTC

svn commit: r1369551 [3/3] - in /hama/branches/HAMA-505-branch: conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Aug  5 11:07:48 2012
@@ -17,7 +17,18 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -25,19 +36,30 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
-import org.apache.hama.bsp.message.type.ByteMessage;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
+import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.KeyValuePair;
 
 public class TestCheckpoint extends TestCase {
 
@@ -45,130 +67,578 @@ public class TestCheckpoint extends Test
 
   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static class TestMessageManager implements MessageManager<Text> {
+
+    List<Text> messageQueue = new ArrayList<Text>();
+    BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
+    Iterator<Text> iter = null;
+    MessageEventListener<Text> listener;
+
+    @Override
+    public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
+        Configuration conf, InetSocketAddress peerAddress) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      if (iter == null)
+        iter = this.messageQueue.iterator();
+      if (iter.hasNext())
+        return iter.next();
+      return null;
+    }
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public void finishSendPhase() throws IOException {
+    }
+
+    @Override
+    public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
+      return null;
+    }
+
+    @Override
+    public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
+        throws IOException {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void clearOutgoingQueues() {
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return this.messageQueue.size();
+    }
+
+    public BSPMessageBundle<Text> getLoopbackBundle() {
+      return this.loopbackBundle;
+    }
+
+    public void addMessage(Text message) throws IOException {
+      this.messageQueue.add(message);
+      listener.onMessageReceived(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+      this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
+    }
+
+    @Override
+    public void loopBackMessage(Writable message) {
+    }
+
+    @Override
+    public void registerListener(MessageEventListener<Text> listener)
+        throws IOException {
+      this.listener = listener;
+    }
+
+  }
+
+  public static class TestBSPPeer implements
+      BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
+
+    Configuration conf;
+    long superstepCount;
+    FaultTolerantPeerService<Text> fService;
+
+    public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
+        Counters counters, long superstep, BSPPeerSyncClient syncClient,
+        MessageManager<Text> messenger, TaskStatus.State state) {
+      this.conf = conf;
+      if (superstep > 0)
+        superstepCount = superstep;
+      else
+        superstepCount = 0L;
+
+      try {
+        fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
+            job, (BSPPeer<?, ?, ?, ?, Text>) this,
+            (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
+            messenger);
+        this.fService.onPeerInitialized(state);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      return new Text("data");
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return 1;
+    }
+
+    @Override
+    public void sync() throws IOException, SyncException, InterruptedException {
+      ++superstepCount;
+      try {
+        this.fService.afterBarrier();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      LOG.info("After barrier " + superstepCount);
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superstepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return null;
+    }
+
+    @Override
+    public String getPeerName(int index) {
+      return null;
+    }
+
+    @Override
+    public int getPeerIndex() {
+      return 1;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return null;
+    }
+
+    @Override
+    public int getNumPeers() {
+      return 0;
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public void write(NullWritable key, NullWritable value) throws IOException {
+
+    }
+
+    @Override
+    public boolean readNext(NullWritable key, NullWritable value)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public KeyValuePair<NullWritable, NullWritable> readNext()
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public void reopenInput() throws IOException {
+
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return null;
+    }
+
+    @Override
+    public void incrementCounter(Enum<?> key, long amount) {
+
+    }
+
+    @Override
+    public void incrementCounter(String group, String counter, long amount) {
+
+    }
+
+  }
+
+  public static class TempSyncClient extends BSPPeerSyncClient {
+
+    Map<String, Writable> valueMap = new HashMap<String, Writable>();
+
+    @Override
+    public String constructKey(BSPJobID jobId, String... args) {
+      StringBuffer buffer = new StringBuffer(100);
+      buffer.append(jobId.toString()).append("/");
+      for (String arg : args) {
+        buffer.append(arg).append("/");
+      }
+      return buffer.toString();
+    }
+
+    @Override
+    public boolean storeInformation(String key, Writable value,
+        boolean permanent, SyncEventListener listener) {
+      ArrayWritable writables = (ArrayWritable) value;
+      long step = ((LongWritable) writables.get()[0]).get();
+      long count = ((LongWritable) writables.get()[1]).get();
+
+      LOG.info("SyncClient Storing value step = " + step + " count = " + count
+          + " for key " + key);
+      valueMap.put(key, value);
+      return true;
+    }
+
+    @Override
+    public boolean getInformation(String key,
+        Writable valueHolder) {
+      LOG.info("Getting value for key " + key);
+      if(!valueMap.containsKey(key)){
+        return false;
+      }
+      Writable value =  valueMap.get(key);
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream outputStream = new DataOutputStream(byteStream);
+      byte[] data = null;
+      try {
+        value.write(outputStream);
+        outputStream.flush();
+        data = byteStream.toByteArray();
+        ByteArrayInputStream istream = new ByteArrayInputStream(data);
+        DataInputStream diStream = new DataInputStream(istream);
+        valueHolder.readFields(diStream);
+        return true;
+      } catch (IOException e) {
+        LOG.error("Error writing data to write buffer.", e);
+      } finally {
+        try {
+          byteStream.close();
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.error("Error closing byte stream.", e);
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean addKey(String key, boolean permanent,
+        SyncEventListener listener) {
+      valueMap.put(key, NullWritable.get());
+      return true;
+    }
+
+    @Override
+    public boolean hasKey(String key) {
+      return valueMap.containsKey(key);
+    }
+
+    @Override
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      List<String> list = new ArrayList<String>();
+      Iterator<String> keyIter = valueMap.keySet().iterator();
+      while (keyIter.hasNext()) {
+        String keyVal = keyIter.next();
+        if (keyVal.startsWith(key + "/")) {
+          list.add(keyVal);
+        }
+      }
+      String[] arr = new String[list.size()];
+      list.toArray(arr);
+      return arr;
+    }
+
+    @Override
+    public boolean registerListener(String key, SyncEvent event,
+        SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean remove(String key, SyncEventListener listener) {
+      valueMap.remove(key);
+      return false;
+    }
+
+    @Override
+    public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+        throws Exception {
+    }
+
+    @Override
+    public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Enter barrier called - " + superstep);
+    }
+
+    @Override
+    public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Exit barrier called - " + superstep);
+    }
+
+    @Override
+    public void register(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public String[] getAllPeerNames(TaskAttemptID taskId) {
+      return null;
+    }
+
+    @Override
+    public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public void stopServer() {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+  }
+
+  private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspTask, BSPJob job, long step, long count) {
+    
+    ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
+    
+    boolean result = syncClient.getInformation(
+        syncClient.constructKey(job.getJobID(), "checkpoint",
+            "" + bspTask.getPeerIndex()), writableVal);
+    
+    assertTrue(result);
+
+    LongWritable superstepNo = (LongWritable) writableVal.get()[0]; 
+    LongWritable msgCount = (LongWritable) writableVal.get()[1];
+
+    assertEquals(step, superstepNo.get());
+    assertEquals(count, msgCount.get());
+  }
+
+  public void testCheckpointInterval() throws Exception {
+    Configuration config = new Configuration();
+    System.setProperty("user.dir", "/tmp");
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    @SuppressWarnings("rawtypes")
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
+
+    assertNotNull("BSPPeerImpl should not be null.", bspTask);
+
+    LOG.info("Created bsp peer and other parameters");
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    boolean result = syncClient.getInformation(
+            syncClient.constructKey(job.getJobID(), "checkpoint",
+                "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
+
+    assertFalse(result);
+
+    bspTask.sync();
+    // Superstep 1
+  
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 2
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 3
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+    bspTask.sync();
+    // Superstep 4
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+    messenger.addMessage(txtMessage);
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 5
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    bspTask.sync();
+    // Superstep 6
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    dfs.delete(new Path("checkpoint"), true);
+  }
+
+  @SuppressWarnings("rawtypes")
   public void testCheckpoint() throws Exception {
     Configuration config = new Configuration();
     config.set(SyncServiceFactory.SYNC_PEER_CLASS,
-        LocalBSPRunner.LocalSyncClient.class.getName());
+        TempSyncClient.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
+
     config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
     FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
 
-    BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
-    bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
-        new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
-        "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
     assertNotNull("BSPPeerImpl should not be null.", bspTask);
-    if (dfs.mkdirs(new Path("checkpoint"))) {
-      if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
-        if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
-          ;
-      }
-    }
-    assertTrue("Make sure directory is created.",
-        dfs.exists(new Path(checkpointedDir)));
-    byte[] tmpData = "data".getBytes();
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
-    assertNotNull("Message bundle can not be null.", bundle);
-    assertNotNull("Configuration should not be null.", config);
-    bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
-        bundle);
-    FSDataInputStream in = dfs.open(new Path(checkpointedDir
-        + "/attempt_201110302255_0001_000000_0"));
-    BSPMessageBundle bundleRead = new BSPMessageBundle();
-    bundleRead.readFields(in);
-    in.close();
-    ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
-    String content = new String(byteMsg.getData());
-    LOG.info("Saved checkpointed content is " + content);
-    assertTrue("Message content should be the same.", "data".equals(content));
+
+    LOG.info("Created bsp peer and other parameters");
+
+    @SuppressWarnings("unused")
+    FaultTolerantPeerService<Text> service = null;
+
+    bspTask.sync();
+    LOG.info("Completed first sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
+    
+    bspTask.sync();
+
+    LOG.info("Completed second sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
+
+    // Checking the messages for superstep 2 and peer id 1
+    String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
+    FSDataInputStream in = dfs.open(new Path(expectedPath));
+
+    String className = in.readUTF();
+    Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
+        config);
+    message.readFields(in);
+
+    assertEquals("data", message.toString());
+
     dfs.delete(new Path("checkpoint"), true);
   }
 
-  public void testCheckpointInterval() throws Exception {
+  public void testPeerRecovery() throws Exception {
+    Configuration config = new Configuration();
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
 
-    Configuration conf = new Configuration();
-    conf.set("bsp.output.dir", "/tmp/hama-test_out");
-    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
-        LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
-
-    conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
-
-    int port = BSPNetUtils.getFreePort(5000);
-    InetSocketAddress inetAddress = new InetSocketAddress(port);
-    MinimalGroomServer groom = new MinimalGroomServer(conf);
-    Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
-        inetAddress.getPort(), conf);
-    workerServer.start();
-
-    LOG.info("Started RPC server");
-    conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
-    conf.setInt("bsp.peers.num", 1);
-
-    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
-        BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
-        conf);
-    LOG.info("Started the proxy connections");
-
-    TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
-        "job_201110102255", 1), 1), 1);
-
-    try {
-      BSPJob job = new BSPJob(new HamaConfiguration(conf));
-      job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
-      job.setOutputFormat(TextOutputFormat.class);
-      final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
-          BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
-          new InetSocketAddress("127.0.0.1", port), conf);
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
 
-      BSPTask task = new BSPTask();
-      task.setConf(job);
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
 
-      @SuppressWarnings("rawtypes")
-      BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
-          proto, 0, null, null, new Counters());
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
-      bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
-          TaskStatus.State.RUNNING, "running", "127.0.0.1",
-          TaskStatus.Phase.STARTING, new Counters()));
-
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
-      conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
-      conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
-
-      bspPeer.sync();
-
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), true);
-
-      job.setCheckPointInterval(5);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
-    } catch (Exception e) {
-      LOG.error("Error testing BSPPeer.", e);
-    } finally {
-      umbilical.close();
-      Thread.sleep(2000);
-      workerServer.stop();
-      Thread.sleep(2000);
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+
+    Text txtMessage = new Text("data");
+    String writeKey = "job_checkpttest_0001/checkpoint/1/";
+
+    Writable[] writableArr = new Writable[2];
+    writableArr[0] = new LongWritable(3L);
+    writableArr[1] = new LongWritable(5L);
+    ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+    arrWritable.set(writableArr);
+    syncClient.storeInformation(writeKey, arrWritable, true, null);
+
+    String writePath = "checkpoint/job_checkpttest_0001/3/1";
+    FSDataOutputStream out = dfs.create(new Path(writePath));
+    for (int i = 0; i < 5; ++i) {
+      out.writeUTF(txtMessage.getClass().getCanonicalName());
+      txtMessage.write(out);
     }
+    out.close();
 
+    @SuppressWarnings("unused")
+    BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
+        new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
+        TaskStatus.State.RECOVERING);
+
+    BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
+    assertEquals(5, bundleRead.getMessages().size());
+    String recoveredMsg = bundleRead.getMessages().get(0).toString();
+    assertEquals(recoveredMsg, "data");
+    dfs.delete(new Path("checkpoint"), true);
   }
+
 }

Added: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java (added)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocation extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestTaskAllocation.class);
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testBestEffortDataLocality() throws Exception {
+
+    Configuration conf = new Configuration();
+
+    String[] locations = new String[] { "host6", "host4", "host3" };
+    String value = "data";
+    RawSplit split = new RawSplit();
+    split.setLocations(locations);
+    split.setBytes(value.getBytes(), 0, value.getBytes().length);
+    split.setDataLength(value.getBytes().length);
+
+    assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+    Map<GroomServerStatus, Integer> taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>(
+        20);
+    BSPResource[] resources = new BSPResource[0];
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+    TaskInProgress taskInProgress = new TaskInProgress(job.getJobID(),
+        "job.xml", split, conf, jobProgress, 1);
+
+    Map<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
+        20);
+
+    for (int i = 0; i < 10; ++i) {
+
+      String name = "host" + i;
+      GroomServerStatus status = new GroomServerStatus(name,
+          new ArrayList<TaskStatus>(), 0, 3);
+      groomStatuses.put(name, status);
+      taskCountInGroomMap.put(status, 0);
+
+    }
+
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", BestEffortDataLocalTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+        resources, taskInProgress);
+
+    List<String> list = new ArrayList<String>();
+
+    for (int i = 0; i < hosts.length; ++i) {
+      list.add(hosts[i]);
+    }
+
+    assertTrue(list.contains("host6"));
+    assertTrue(list.contains("host3"));
+    assertTrue(list.contains("host4"));
+
+  }
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sun Aug  5 11:07:48 2012
@@ -23,7 +23,9 @@ import java.io.IOException;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
@@ -41,6 +43,7 @@ public class TestZooKeeper extends TestC
 
   public TestZooKeeper() {
     configuration = new HamaConfiguration();
+    System.setProperty("user.dir", "/tmp");
     configuration.set("bsp.master.address", "localhost");
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
@@ -144,12 +147,32 @@ public class TestZooKeeper extends TestC
 
       Log.info("Passed the key presence test");
 
-      Writable value = masterClient
-          .getInformation(masterClient.constructKey(jobID, "info", "level2"),
-              IntWritable.class);
-
-      assertEquals(null, value);
-      Log.info("Passed the null value check.");
+      boolean result = masterClient
+          .getInformation(masterClient.constructKey(jobID, "info", "level3"),
+              new IntWritable());
+
+      assertEquals(false, result);
+      
+      Writable[] writableArr = new Writable[2];
+      writableArr[0] = new LongWritable(3L);
+      writableArr[1] = new LongWritable(5L);
+      ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+      arrWritable.set(writableArr);
+      masterClient.storeInformation(
+          masterClient.constructKey(jobID, "info", "level3"), 
+          arrWritable, true, null);
+      
+      ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
+      
+      boolean getResult = masterClient.getInformation(
+          masterClient.constructKey(jobID, "info", "level3"), valueHolder);
+      
+      assertTrue(getResult);
+      
+      assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
+      assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
+      
+      Log.info("Passed array writable test");
       done = true;
 
     } catch (Exception e) {

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Sun Aug  5 11:07:48 2012
@@ -17,7 +17,6 @@
  */
 package org.apache.hama.bsp.sync;
 
-import java.io.File;
 import java.util.concurrent.Executors;
 
 import junit.framework.TestCase;
@@ -36,38 +35,36 @@ import org.apache.hama.util.BSPNetUtils;
 public class TestSyncServiceFactory extends TestCase {
 
   public static final Log LOG = LogFactory.getLog(TestCase.class);
-  
-  public static class ListenerTest extends ZKSyncEventListener{
+
+  public static class ListenerTest extends ZKSyncEventListener {
 
     private Text value;
-    
-    public ListenerTest(){
+
+    public ListenerTest() {
       value = new Text("init");
     }
-    
-    public String getValue(){
+
+    public String getValue() {
       return value.toString();
     }
-    
+
     @Override
     public void onDelete() {
-      // TODO Auto-generated method stub
-      
+
     }
 
     @Override
     public void onChange() {
       LOG.info("ZK value changed event triggered.");
       value.set("Changed");
-      
+
     }
 
     @Override
     public void onChildKeySetChange() {
-      // TODO Auto-generated method stub
-      
+
     }
-    
+
   }
 
   public void testClientInstantiation() throws Exception {
@@ -96,7 +93,6 @@ public class TestSyncServiceFactory exte
 
     @Override
     public void run() {
-      // TODO Auto-generated method stub
       try {
         server.start();
       } catch (Exception e) {
@@ -109,10 +105,13 @@ public class TestSyncServiceFactory exte
   public void testZKSyncStore() throws Exception {
     Configuration conf = new Configuration();
     int zkPort = BSPNetUtils.getFreePort(21811);
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.output.dir", "/tmp/hama-test_out");
     conf.setInt(Constants.PEER_PORT, zkPort);
     conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
     conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+    System.setProperty("user.dir", "/tmp");
     // given null, should return zookeeper
     final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
     syncServer.init(conf);
@@ -123,8 +122,8 @@ public class TestSyncServiceFactory exte
 
     Thread.sleep(1000);
 
-    final PeerSyncClient syncClient = (PeerSyncClient)
-    		SyncServiceFactory.getPeerSyncClient(conf);
+    final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+        .getPeerSyncClient(conf);
     assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
     BSPJobID jobId = new BSPJobID("abc", 1);
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
@@ -142,69 +141,45 @@ public class TestSyncServiceFactory exte
       }
     });
 
-    try {
-      IntWritable data = new IntWritable(5);
-      syncClient.storeInformation(
-          syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
-          true, null);
-      
-      ListenerTest listenerTest = new ListenerTest();
-      
-      
-      syncClient.registerListener(
-          syncClient.constructKey(jobId, String.valueOf(1L), "test"), 
-          ZKSyncEventFactory.getValueChangeEvent(),
-          listenerTest);
-      
-      IntWritable value = (IntWritable) syncClient.getInformation(
-          syncClient.constructKey(jobId, String.valueOf(1L), "test"),
-          IntWritable.class);
-      assertTrue(value != null);
-      int intVal = value == null ? 0 : value.get();
-      assertTrue(intVal == data.get());
-      
-      data.set(6);
-      syncClient.storeInformation(
-          syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
-          true, null);
-      value = (IntWritable) syncClient.getInformation(
-          syncClient.constructKey(jobId, String.valueOf(1L), "test"),
-          IntWritable.class);
-
-      
-      intVal = value == null ? 0 : value.get();
-      assertTrue(intVal == data.get());
-      
-      Thread.sleep(5000);
-      
-      assertEquals(true, listenerTest.getValue().equals("Changed"));
-      
-      
-      syncServer.stopServer();
-    } finally {
-
-      String dir = System.getProperty("user.dir");
-      LOG.info("Deleting zookeeper files in " + dir);
-      File zookeeperDir = new File(dir + File.separator + "nullzookeeper");
-      if (zookeeperDir.exists()) {
-        File[] files = zookeeperDir.listFiles();
-        for (File file : files) {
-          if (file.isDirectory()) {
-            File[] childFiles = file.listFiles();
-            for (File childFile : childFiles) {
-              LOG.info("Deleting zookeeper file - "
-                  + childFile.getAbsolutePath());
-              childFile.delete();
-            }
-          } else {
-            LOG.info("Deleting zookeeper file - " + file.getAbsolutePath());
-            file.delete();
-          }
-        }
-        zookeeperDir.delete();
+    IntWritable data = new IntWritable(5);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+
+    ListenerTest listenerTest = new ListenerTest();
+
+    syncClient.registerListener(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+        ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
+
+    IntWritable valueHolder = new IntWritable();
+    boolean result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+    assertTrue(result);
+    int intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
+
+    data.set(6);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+    valueHolder = new IntWritable();
+    result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+
+    assertTrue(result);
+    intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
 
-      }
-    }
+    Thread.sleep(5000);
+
+    assertEquals(true, listenerTest.getValue().equals("Changed"));
+
+    syncServer.stopServer();
 
   }