You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/27 11:35:48 UTC

svn commit: r1176297 [6/19] - in /incubator/hama/branches/HamaV2: ./ api/ api/target/ api/target/classes/ api/target/classes/META-INF/ api/target/lib/ api/target/maven-archiver/ api/target/maven-shared-archive-resources/ api/target/maven-shared-archive...

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.checkpoint.CheckpointRunner;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This class represents a BSP peer.
+ */
+public class BSPPeer implements Watcher, BSPPeerInterface {
+
+  public static final Log LOG = LogFactory.getLog(BSPPeer.class);
+
+  private final Configuration conf;
+  private BSPJob jobConf;
+
+  private volatile Server server = null;
+  private ZooKeeper zk = null;
+  private volatile Integer mutex = 0;
+
+  private final String bspRoot;
+  private final String quorumServers;
+
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+
+  private InetSocketAddress peerAddress;
+  private TaskStatus currentTaskStatus;
+
+  private TaskAttemptID taskid;
+  private BSPPeerProtocol umbilical;
+
+  private final BSPMessageSerializer messageSerializer;
+
+  public static final class BSPSerializableMessage implements Writable {
+    final AtomicReference<String> path = new AtomicReference<String>();
+    final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+    public BSPSerializableMessage() {
+    }
+
+    public BSPSerializableMessage(final String path,
+        final BSPMessageBundle bundle) {
+      if (null == path)
+        throw new NullPointerException("No path provided for checkpointing.");
+      if (null == bundle)
+        throw new NullPointerException("No data provided for checkpointing.");
+      this.path.set(path);
+      this.bundle.set(bundle);
+    }
+
+    public final String checkpointedPath() {
+      return this.path.get();
+    }
+
+    public final BSPMessageBundle messageBundle() {
+      return this.bundle.get();
+    }
+
+    @Override
+    public final void write(DataOutput out) throws IOException {
+      out.writeUTF(this.path.get());
+      this.bundle.get().write(out);
+    }
+
+    @Override
+    public final void readFields(DataInput in) throws IOException {
+      this.path.set(in.readUTF());
+      BSPMessageBundle pack = new BSPMessageBundle();
+      pack.readFields(in);
+      this.bundle.set(pack);
+    }
+
+  }// serializable message
+
+  final class BSPMessageSerializer {
+    final Socket client;
+    final ScheduledExecutorService sched;
+
+    public BSPMessageSerializer(final int port) {
+      Socket tmp = null;
+      int cnt = 0;
+      do {
+        tmp = init(port);
+        cnt++;
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          LOG.warn("Thread is interrupted.", ie);
+          Thread.currentThread().interrupt();
+        }
+      } while (null == tmp && 10 > cnt);
+      this.client = tmp;
+      if (null == this.client)
+        throw new NullPointerException("Client socket is null.");
+      this.sched = Executors.newScheduledThreadPool(conf.getInt(
+          "bsp.checkpoint.serializer_thread", 10));
+      LOG.info(BSPMessageSerializer.class.getName()
+          + " is ready to serialize message.");
+    }
+
+    private Socket init(final int port) {
+      Socket tmp = null;
+      try {
+        tmp = new Socket("localhost", port);
+      } catch (UnknownHostException uhe) {
+        LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+      } catch (IOException ioe) {
+        LOG.warn("Fail to create socket.", ioe);
+      }
+      return tmp;
+    }
+
+    void serialize(final BSPSerializableMessage tmp) throws IOException {
+      if (LOG.isDebugEnabled())
+        LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+      final DataOutput out = new DataOutputStream(client.getOutputStream());
+      this.sched.schedule(new Callable<Object>() {
+        public Object call() throws Exception {
+          tmp.write(out);
+          return null;
+        }
+      }, 0, SECONDS);
+    }
+
+    public void close() {
+      try {
+        this.client.close();
+        this.sched.shutdown();
+      } catch (IOException io) {
+        LOG.error("Fail to close client socket.", io);
+      }
+    }
+
+  }// message serializer
+
+  /**
+   * Protected default constructor for LocalBSPRunner.
+   */
+  protected BSPPeer() {
+    bspRoot = null;
+    quorumServers = null;
+    messageSerializer = null;
+    conf = null;
+  }
+
+  /**
+   * BSPPeer Constructor.
+   * 
+   * BSPPeer acts on behalf of clients performing bsp() tasks.
+   * 
+   * @param conf is the configuration file containing bsp peer host, port, etc.
+   * @param umbilical is the bsp protocol used to contact its parent process.
+   * @param taskid is the id that current process holds.
+   */
+  public BSPPeer(Configuration conf, TaskAttemptID taskid,
+      BSPPeerProtocol umbilical) throws IOException {
+    this.conf = conf;
+    this.taskid = taskid;
+    this.umbilical = umbilical;
+
+    String bindAddress = conf.get(Constants.PEER_HOST,
+        Constants.DEFAULT_PEER_HOST);
+    int bindPort = conf
+        .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+    quorumServers = QuorumPeer.getZKQuorumServersString(conf);
+    if (LOG.isDebugEnabled())
+      LOG.debug("Quorum  " + quorumServers);
+    peerAddress = new InetSocketAddress(bindAddress, bindPort);
+    BSPMessageSerializer msgSerializer = null;
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      msgSerializer = new BSPMessageSerializer(conf.getInt(
+          "bsp.checkpoint.port",
+          Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+    }
+    this.messageSerializer = msgSerializer;
+  }
+
+  public void reinitialize() {
+    try {
+      if (LOG.isDebugEnabled())
+        LOG.debug("reinitialize(): " + getPeerName());
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
+      server.start();
+      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+          + peerAddress.getPort());
+    } catch (IOException e) {
+      LOG.error("Fail to start RPC server!", e);
+    }
+
+    try {
+      this.zk = new ZooKeeper(quorumServers, conf.getInt(
+          Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    } catch (IOException e) {
+      LOG.error("Fail while reinitializing zookeeeper!", e);
+    }
+  }
+
+  @Override
+  public BSPMessage getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+   * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+   */
+  @Override
+  public void send(String peerName, BSPMessage msg) throws IOException {
+    if (peerName.equals(getPeerName())) {
+      LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+      localQueueForNextIteration.add(msg);
+    } else {
+      LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+      InetSocketAddress targetPeerAddress = null;
+      // Get socket for target peer.
+      if (peerSocketCache.containsKey(peerName)) {
+        targetPeerAddress = peerSocketCache.get(peerName);
+      } else {
+        targetPeerAddress = getAddress(peerName);
+        peerSocketCache.put(peerName, targetPeerAddress);
+      }
+      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+          .get(targetPeerAddress);
+      if (queue == null) {
+        queue = new ConcurrentLinkedQueue<BSPMessage>();
+      }
+      queue.add(msg);
+      outgoingQueues.put(targetPeerAddress, queue);
+    }
+  }
+
+  private String checkpointedPath() {
+    String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+    String ckptPath = backup + jobConf.getJobID().toString() + "/"
+        + getSuperstepCount() + "/" + this.taskid.toString();
+    if (LOG.isDebugEnabled())
+      LOG.debug("Messages are to be saved to " + ckptPath);
+    return ckptPath;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+   */
+  @Override
+  public void sync() throws IOException, KeeperException, InterruptedException {
+    enterBarrier();
+    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+        .entrySet().iterator();
+
+    while (it.hasNext()) {
+      Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+          .next();
+
+      BSPPeerInterface peer = peers.get(entry.getKey());
+      if (peer == null) {
+        try {
+          peer = getBSPPeerConnection(entry.getKey());
+        } catch (NullPointerException ne) {
+          umbilical.fatalError(taskid, entry.getKey().getHostName()
+              + " doesn't exists.");
+        }
+      }
+      Iterable<BSPMessage> messages = entry.getValue();
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      for (BSPMessage message : messages) {
+        bundle.addMessage(message);
+      }
+
+      // checkpointing
+      if (null != this.messageSerializer) {
+        this.messageSerializer.serialize(new BSPSerializableMessage(
+            checkpointedPath(), bundle));
+      }
+
+      peer.put(bundle);
+    }
+
+    leaveBarrier();
+    currentTaskStatus.incrementSuperstepCount();
+    umbilical.statusUpdate(taskid, currentTaskStatus);
+
+    // Clear outgoing queues.
+    clearOutgoingQueues();
+
+    // Add non-processed messages from this iteration for the next's queue.
+    while (!localQueue.isEmpty()) {
+      BSPMessage message = localQueue.poll();
+      localQueueForNextIteration.add(message);
+    }
+    // Switch local queues.
+    localQueue = localQueueForNextIteration;
+    localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  }
+
+  private void createZnode(final String path) throws KeeperException,
+      InterruptedException {
+    createZnode(path, CreateMode.PERSISTENT);
+  }
+
+  private void createEphemeralZnode(final String path) throws KeeperException,
+      InterruptedException {
+    createZnode(path, CreateMode.EPHEMERAL);
+  }
+
+  private void createZnode(final String path, final CreateMode mode)
+      throws KeeperException, InterruptedException {
+    synchronized (zk) {
+      Stat s = zk.exists(path, false);
+      if (null == s) {
+        try {
+          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+        } catch (KeeperException.NodeExistsException nee) {
+          LOG.warn("Ignore because znode may be already created at " + path,
+              nee);
+        }
+      }
+    }
+  }
+
+  private class BarrierWatcher implements Watcher {
+    private boolean complete = false;
+
+    boolean isComplete() {
+      return this.complete;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      this.complete = true;
+      synchronized (mutex) {
+        LOG.info(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount()
+            + " taskid:" + taskid.toString() + " is notified.");
+        /*
+         * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+         * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } }
+         * catch(KeeperException.NoNodeException nne) {
+         * LOG.warn("Ignore because znode may be deleted.", nne); }
+         * catch(Exception e) { throw new RuntimeException(e); }
+         */
+        mutex.notifyAll();
+      }
+    }
+  }
+
+  protected boolean enterBarrier() throws KeeperException, InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+          + this.getSuperstepCount());
+    }
+
+    synchronized (zk) {
+      createZnode(bspRoot);
+      final String pathToJobIdZnode = bspRoot + "/"
+          + taskid.getJobID().toString();
+      createZnode(pathToJobIdZnode);
+      final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+          + getSuperstepCount();
+      createZnode(pathToSuperstepZnode);
+      BarrierWatcher barrierWatcher = new BarrierWatcher();
+      Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+          barrierWatcher);
+      zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      int size = znodes.size(); // may contains ready
+      boolean hasReady = znodes.contains("ready");
+      if (hasReady) {
+        size--;
+      }
+
+      LOG.debug("===> at superstep :" + getSuperstepCount()
+          + " current znode size: " + znodes.size() + " current znodes:"
+          + znodes);
+
+      if (LOG.isDebugEnabled())
+        LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+            + " is " + znodes.size() + ". Znodes include " + znodes);
+
+      if (size < jobConf.getNumBspTask()) {
+        LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
+            + " which task is waiting? " + taskid.toString()
+            + " stat is null? " + readyStat);
+        while (!barrierWatcher.isComplete()) {
+          if (!hasReady) {
+            synchronized (mutex) {
+              mutex.wait(1000);
+            }
+          }
+        }
+        LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
+            + " after waiting ..." + taskid.toString());
+      } else {
+        LOG.debug("---> at superstep: " + getSuperstepCount()
+            + " task that is creating /ready znode:" + taskid.toString());
+        createEphemeralZnode(pathToSuperstepZnode + "/ready");
+      }
+    }
+    return true;
+  }
+
+  protected boolean leaveBarrier() throws KeeperException, InterruptedException {
+    final String pathToSuperstepZnode = bspRoot + "/"
+        + taskid.getJobID().toString() + "/" + getSuperstepCount();
+    while (true) {
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:"
+          + getSuperstepCount() + " znode:" + znodes);
+      if (znodes.contains("ready")) {
+        znodes.remove("ready");
+      }
+      final int size = znodes.size();
+      LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+          + " znode size: (" + size + ") znodes:" + znodes);
+      if (null == znodes || znodes.isEmpty())
+        return true;
+      if (1 == size) {
+        try {
+          zk.delete(getNodeName(), 0);
+        } catch (KeeperException.NoNodeException nne) {
+          LOG.warn(
+              "+++ (znode size is 1). Ignore because znode may disconnect.",
+              nne);
+        }
+        return true;
+      }
+      Collections.sort(znodes);
+
+      final String lowest = znodes.get(0);
+      final String highest = znodes.get(size - 1);
+
+      LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
+          + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+          + highest);
+      synchronized (mutex) {
+
+        if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
+          Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: "
+                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + " highest notify lowest.");
+                    mutex.notifyAll();
+                  }
+                }
+              });
+
+          if (null != s) {
+            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " wait for higest notify.");
+            mutex.wait();
+          }
+        } else {
+          Stat s1 = zk.exists(getNodeName(), false);
+
+          if (null != s1) {
+            LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " exists, so delete it.");
+            try {
+              zk.delete(getNodeName(), 0);
+            } catch (KeeperException.NoNodeException nne) {
+              LOG.warn("++++ Ignore because node may be dleted.", nne);
+            }
+          }
+
+          Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: "
+                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + " lowest notify other nodes.");
+                    mutex.notifyAll();
+                  }
+                }
+              });
+          if (null != s2) {
+            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " wait for lowest notify.");
+            mutex.wait();
+          }
+        }
+      }
+    }
+  }
+
+  private String getNodeName() {
+    return bspRoot + "/" + taskid.getJobID().toString() + "/"
+        + getSuperstepCount() + "/" + taskid.toString();
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    synchronized (mutex) {
+      mutex.notify();
+    }
+  }
+
+  public void clear() {
+    this.localQueue.clear();
+    this.outgoingQueues.clear();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.clear();
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    if (server != null)
+      server.stop();
+    if (null != messageSerializer)
+      this.messageSerializer.close();
+  }
+
+  @Override
+  public void put(BSPMessage msg) throws IOException {
+    this.localQueueForNextIteration.add(msg);
+  }
+
+  @Override
+  public void put(BSPMessageBundle messages) throws IOException {
+    for (BSPMessage message : messages.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return BSPPeerInterface.versionID;
+  }
+
+  protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr)
+      throws NullPointerException {
+    BSPPeerInterface peer;
+    synchronized (this.peers) {
+      peer = peers.get(addr);
+
+      if (peer == null) {
+        try {
+          peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class,
+              BSPPeerInterface.versionID, addr, this.conf);
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+        this.peers.put(addr, peer);
+      }
+    }
+
+    return peer;
+  }
+
+  /**
+   * @return the string as host:port of this Peer
+   */
+  public String getPeerName() {
+    return peerAddress.getHostName() + ":" + peerAddress.getPort();
+  }
+
+  private InetSocketAddress getAddress(String peerName) {
+    String[] peerAddrParts = peerName.split(":");
+    if (peerAddrParts.length != 2) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Peername must consist of exactly ONE \":\"! Given peername was: "
+              + peerName);
+    }
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.parseInt(peerAddrParts[1]));
+  }
+
+  @Override
+  public String[] getAllPeerNames() {
+    String[] result = null;
+    try {
+      result = zk.getChildren("/" + jobConf.getJobID().toString(), this)
+          .toArray(new String[0]);
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    return result;
+  }
+
+  /**
+   * @return the number of messages
+   */
+  public int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  /**
+   * Sets the current status
+   * 
+   * @param currentTaskStatus
+   */
+  public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+    this.currentTaskStatus = currentTaskStatus;
+  }
+
+  /**
+   * @return the count of current super-step
+   */
+  public long getSuperstepCount() {
+    return currentTaskStatus.getSuperstepCount();
+  }
+
+  /**
+   * Sets the job configuration
+   * 
+   * @param jobConf
+   */
+  public void setJobConf(BSPJob jobConf) {
+    this.jobConf = jobConf;
+  }
+
+  /**
+   * @return the size of local queue
+   */
+  public int getLocalQueueSize() {
+    return localQueue.size();
+  }
+
+  /**
+   * @return the size of outgoing queue
+   */
+  public int getOutgoingQueueSize() {
+    return outgoingQueues.size();
+  }
+
+  /**
+   * Clears local queue
+   */
+  public void clearLocalQueue() {
+    this.localQueue.clear();
+  }
+
+  /**
+   * Clears outgoing queues
+   */
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * BSP communication interface.
+ */
+public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable,
+    Constants {
+
+  /**
+   * Send a data with a tag to another BSPSlave corresponding to hostname.
+   * Messages sent by this method are not guaranteed to be received in a sent
+   * order.
+   * 
+   * @param peerName
+   * @param msg
+   * @throws IOException
+   */
+  public void send(String peerName, BSPMessage msg) throws IOException;
+
+  /**
+   * Puts a message to local queue.
+   * 
+   * @param msg
+   * @throws IOException
+   */
+  public void put(BSPMessage msg) throws IOException;
+
+  /**
+   * Puts a bundle of messages to local queue.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void put(BSPMessageBundle messages) throws IOException;
+
+  /**
+   * @return A message from the peer's received messages queue (a FIFO).
+   * @throws IOException
+   */
+  public BSPMessage getCurrentMessage() throws IOException;
+
+  /**
+   * @return The number of messages in the peer's received messages queue.
+   */
+  public int getNumCurrentMessages();
+
+  /**
+   * Barrier Synchronization.
+   * 
+   * Sends all the messages in the outgoing message queues to the corresponding
+   * remote peers.
+   * 
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void sync() throws IOException, KeeperException, InterruptedException;
+
+  /**
+   * @return the count of current super-step
+   */
+  public long getSuperstepCount();
+
+  /**
+   * @return The name of this peer in the format "hostname:port".
+   */
+  public String getPeerName();
+
+  /**
+   * @return The names of all the peers executing tasks from the same job
+   *         (including this peer).
+   */
+  public String[] getAllPeerNames();
+
+  /**
+   * Clears all queues entries.
+   */
+  public void clear();
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Base class for tasks. 
+ */
+public class BSPTask extends Task {
+  
+  public static final Log LOG = LogFactory.getLog(BSPTask.class);
+  
+  private BSPJob conf;
+  
+  public BSPTask() {
+  }
+
+  public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.taskId = taskid;
+    this.partition = partition;
+  }
+
+  @Override
+  public BSPTaskRunner createRunner(GroomServer groom) {
+    return new BSPTaskRunner(this, groom, this.conf);
+  }
+
+  @Override
+  public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+      throws IOException {
+    
+    BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
+        "bsp.work.class", BSP.class), job.getConf());
+
+    try {
+      bsp.bsp(bspPeer);
+    } catch (IOException e) {
+      LOG.error("Exception during BSP execution!", e);
+    } catch (KeeperException e) {
+      LOG.error("Exception during BSP execution!", e);
+    } catch (InterruptedException e) {
+      LOG.error("Exception during BSP execution!", e);
+    }
+
+    done(umbilical);
+  }
+  
+  public BSPJob getConf() {
+      return conf;
+    }
+  
+    public void setConf(BSPJob conf) {
+      this.conf = conf;
+    }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Base class that runs a task in a separate process.
+ */
+public class BSPTaskRunner extends TaskRunner {
+
+  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
+
+  public BSPTaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+    super(bspTask, groom, conf);
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a boolean value. 
+ */
+public class BooleanMessage extends BSPMessage {
+
+  String tag;
+  boolean data;
+
+  public BooleanMessage() {
+    super();
+  }
+
+  public BooleanMessage(String tag, boolean data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeBoolean(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readBoolean();
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Boolean getData() {
+    return data;
+  }
+}
\ No newline at end of file

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a byte tag and a byte data.
+ */
+public class ByteMessage extends BSPMessage {
+
+  private byte[] tag;
+  private byte[] data;
+
+  public ByteMessage() {
+    super();
+  }
+
+  public ByteMessage(byte[] tag, byte[] data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public byte[] getTag() {
+    return this.tag;
+  }
+
+  @Override
+  public byte[] getData() {
+    return this.data;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.tag = new byte[in.readInt()];
+    in.readFully(tag, 0, this.tag.length);
+    this.data = new byte[in.readInt()];
+    in.readFully(data, 0, this.data.length);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag.length);
+    out.write(tag);
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Status information on the current state of the BSP cluster.
+ * 
+ * <p>
+ * <code>ClusterStatus</code> provides clients with information such as:
+ * <ol>
+ * <li>
+ * Size of the cluster.</li>
+ * <li>
+ * Name of the grooms.</li>
+ * <li>
+ * Task capacity of the cluster.</li>
+ * <li>
+ * The number of currently running bsp tasks.</li>
+ * <li>
+ * State of the <code>BSPMaster</code>.</li>
+ * </ol>
+ * </p>
+ * 
+ * <p>
+ * Clients can query for the latest <code>ClusterStatus</code>, via
+ * {@link BSPJobClient#getClusterStatus(boolean)}.
+ * </p>
+ * 
+ * @see BSPMaster
+ */
+public class ClusterStatus implements Writable {
+
+  private int numActiveGrooms;
+  private Map<String, GroomServerStatus> activeGrooms = new HashMap<String, GroomServerStatus>();
+  private Map<String, String> cachedActiveGroomNames = null;
+  private int tasks;
+  private int maxTasks;
+  private BSPMaster.State state;
+
+  /**
+   * 
+   */
+  public ClusterStatus() {
+  }
+
+  public ClusterStatus(int grooms, int tasks, int maxTasks,
+      BSPMaster.State state) {
+    this.numActiveGrooms = grooms;
+    this.tasks = tasks;
+    this.maxTasks = maxTasks;
+    this.state = state;
+  }
+
+  public ClusterStatus(Map<String, GroomServerStatus> activeGrooms, int tasks,
+      int maxTasks, BSPMaster.State state) {
+    this(activeGrooms.size(), tasks, maxTasks, state);
+    this.activeGrooms = activeGrooms;
+  }
+
+  /**
+   * Get the number of groom servers in the cluster.
+   * 
+   * @return the number of groom servers in the cluster.
+   */
+  public int getGroomServers() {
+    return numActiveGrooms;
+  }
+
+  /**
+   * Get the names of groom servers, and their hostnames, in the cluster.
+   * 
+   * @return the active groom servers in the cluster.
+   */
+  public Map<String, String> getActiveGroomNames() {
+    if (cachedActiveGroomNames == null) {
+      if (activeGrooms != null) {
+        Map<String, String> map = new HashMap<String, String>();
+        for (Entry<String, GroomServerStatus> entry : activeGrooms.entrySet()) {
+          map.put(entry.getKey(), entry.getValue().getGroomHostName());
+        }
+        cachedActiveGroomNames = map;
+      }
+    }
+    return cachedActiveGroomNames;
+  }
+
+  /**
+   * Get the names of groom servers, and their current status in the cluster.
+   * 
+   * @return the active groom servers in the cluster.
+   */
+  public Map<String, GroomServerStatus> getActiveGroomServerStatus() {
+    return activeGrooms;
+  }
+
+  /**
+   * Get the number of currently running tasks in the cluster.
+   * 
+   * @return the number of currently running tasks in the cluster.
+   */
+  public int getTasks() {
+    return tasks;
+  }
+
+  /**
+   * Get the maximum capacity for running tasks in the cluster.
+   * 
+   * @return the maximum capacity for running tasks in the cluster.
+   */
+  public int getMaxTasks() {
+    return maxTasks;
+  }
+
+  /**
+   * Get the current state of the <code>BSPMaster</code>, as
+   * {@link BSPMaster.State}
+   * 
+   * @return the current state of the <code>BSPMaster</code>.
+   */
+  public BSPMaster.State getBSPMasterState() {
+    return state;
+  }
+
+  // ////////////////////////////////////////////
+  // Writable
+  // ////////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (activeGrooms.isEmpty()) {
+      out.writeInt(numActiveGrooms);
+      out.writeBoolean(false);
+    } else {
+      out.writeInt(activeGrooms.size());
+      out.writeBoolean(true);
+
+      for (Entry<String, GroomServerStatus> entry : activeGrooms.entrySet()) {
+        out.writeUTF(entry.getKey());
+        entry.getValue().write(out);
+      }
+
+    }
+    out.writeInt(tasks);
+    out.writeInt(maxTasks);
+    WritableUtils.writeEnum(out, state);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numActiveGrooms = in.readInt();
+    boolean groomListFollows = in.readBoolean();
+
+    if (groomListFollows) {
+      activeGrooms = new HashMap<String, GroomServerStatus>(numActiveGrooms);
+
+      for (int i = 0; i < numActiveGrooms; i++) {
+        final String groomName = in.readUTF();
+        final GroomServerStatus status = new GroomServerStatus();
+        status.readFields(in);
+        activeGrooms.put(groomName, status);
+      }
+    }
+
+    tasks = in.readInt();
+    maxTasks = in.readInt();
+    state = WritableUtils.readEnum(in, BSPMaster.State.class);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
+ * of the task.
+ */
+class CommitTaskAction extends GroomServerAction {
+  private TaskAttemptID taskId;
+  
+  public CommitTaskAction() {
+    super(ActionType.COMMIT_TASK);
+    taskId = new TaskAttemptID();
+  }
+  
+  public CommitTaskAction(TaskAttemptID taskId) {
+    super(ActionType.COMMIT_TASK);
+    this.taskId = taskId;
+  }
+  
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ */
+public class Directive implements Writable{
+
+  protected long timestamp;
+  protected Directive.Type type;
+
+  public static enum Type {
+    Request(1), Response(2);
+    int t;
+
+    Type(int t) {
+      this.t = t;
+    }
+
+    public int value() {
+      return this.t;
+    }
+  };
+
+  public Directive(){}
+
+  public Directive(Directive.Type type) {
+    this.timestamp = System.currentTimeMillis();
+    this.type = type;
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public Directive.Type getType() {
+    return this.type;
+  }
+
+  /**
+   * Command for BSPMaster or GroomServer to execute.
+  public abstract void execute() throws Exception;
+   */
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(this.timestamp);
+    out.writeInt(this.type.value());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.timestamp = in.readLong();
+    int t = in.readInt();
+    if (Directive.Type.Request.value() == t) {
+      this.type = Directive.Type.Request;
+    }else{
+      this.type = Directive.Type.Response;
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A custom exception class for Directive.
+ */
+public class DirectiveException extends RuntimeException{
+  private static final long serialVersionUID = -8052582046894492822L;
+
+  public DirectiveException(){ 
+    super(); 
+  }
+
+  public DirectiveException(String message){ 
+    super(message); 
+  }
+
+  public DirectiveException(String message, Throwable t){
+    super(message, t);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A DirectiveHandler interface.
+ */
+public interface DirectiveHandler{
+
+  /**
+   * Handle directives on demand. 
+   * @param directive to be handled.
+   */
+  void handle(Directive directive) throws DirectiveException;
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Handles the tasks dispatching between the BSPMaster and the GroomServers.
+ */
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+  public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+  private GroomServerAction[] actions;
+
+  public DispatchTasksDirective() {
+    super();
+  }
+
+  public DispatchTasksDirective(GroomServerAction[] actions) {
+    super(Directive.Type.Request);
+    this.actions = actions;
+  }
+
+  public GroomServerAction[] getActions() {
+    return this.actions;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    if (this.actions == null) {
+      WritableUtils.writeVInt(out, 0);
+    } else {
+      WritableUtils.writeVInt(out, actions.length);
+      for (GroomServerAction action : this.actions) {
+        WritableUtils.writeEnum(out, action.getActionType());
+        action.write(out);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int length = WritableUtils.readVInt(in);
+    if (length > 0) {
+      this.actions = new GroomServerAction[length];
+      for (int i = 0; i < length; ++i) {
+        GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+            GroomServerAction.ActionType.class);
+        actions[i] = GroomServerAction.createAction(actionType);
+        actions[i].readFields(in);
+      }
+    } else {
+      this.actions = null;
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a double data. 
+ */
+public class DoubleMessage extends BSPMessage {
+
+  private String tag;
+  private double data;
+
+  public DoubleMessage() {
+    super();
+  }
+
+  public DoubleMessage(String tag, double data) {
+    super();
+    this.data = data;
+    this.tag = tag;
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Double getData() {
+    return data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeDouble(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readDouble();
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+  public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+  private final String name;
+  private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+  public FCFSQueue(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public void addJob(JobInProgress job) {
+    try {
+      queue.put(job);
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+    }
+  }
+
+  @Override
+  public void removeJob(JobInProgress job) {
+    queue.remove(job);
+  }
+
+  @Override
+  public JobInProgress removeJob() {
+    try {
+      return queue.take();
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+    }
+    return null;
+  }
+
+  @Override
+  public Collection<JobInProgress> jobs() {
+    return queue;
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native