You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/27 11:35:48 UTC
svn commit: r1176297 [6/19] - in /incubator/hama/branches/HamaV2: ./ api/
api/target/ api/target/classes/ api/target/classes/META-INF/
api/target/lib/ api/target/maven-archiver/
api/target/maven-shared-archive-resources/ api/target/maven-shared-archive...
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.checkpoint.CheckpointRunner;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This class represents a BSP peer.
+ */
+public class BSPPeer implements Watcher, BSPPeerInterface {
+
+ public static final Log LOG = LogFactory.getLog(BSPPeer.class);
+
+ private final Configuration conf;
+ private BSPJob jobConf;
+
+ private volatile Server server = null;
+ private ZooKeeper zk = null;
+ private volatile Integer mutex = 0;
+
+ private final String bspRoot;
+ private final String quorumServers;
+
+ private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+ private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+
+ private InetSocketAddress peerAddress;
+ private TaskStatus currentTaskStatus;
+
+ private TaskAttemptID taskid;
+ private BSPPeerProtocol umbilical;
+
+ private final BSPMessageSerializer messageSerializer;
+
+ public static final class BSPSerializableMessage implements Writable {
+ final AtomicReference<String> path = new AtomicReference<String>();
+ final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+ public BSPSerializableMessage() {
+ }
+
+ public BSPSerializableMessage(final String path,
+ final BSPMessageBundle bundle) {
+ if (null == path)
+ throw new NullPointerException("No path provided for checkpointing.");
+ if (null == bundle)
+ throw new NullPointerException("No data provided for checkpointing.");
+ this.path.set(path);
+ this.bundle.set(bundle);
+ }
+
+ public final String checkpointedPath() {
+ return this.path.get();
+ }
+
+ public final BSPMessageBundle messageBundle() {
+ return this.bundle.get();
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ out.writeUTF(this.path.get());
+ this.bundle.get().write(out);
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ this.path.set(in.readUTF());
+ BSPMessageBundle pack = new BSPMessageBundle();
+ pack.readFields(in);
+ this.bundle.set(pack);
+ }
+
+ }// serializable message
+
+ final class BSPMessageSerializer {
+ final Socket client;
+ final ScheduledExecutorService sched;
+
+ public BSPMessageSerializer(final int port) {
+ Socket tmp = null;
+ int cnt = 0;
+ do {
+ tmp = init(port);
+ cnt++;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ } while (null == tmp && 10 > cnt);
+ this.client = tmp;
+ if (null == this.client)
+ throw new NullPointerException("Client socket is null.");
+ this.sched = Executors.newScheduledThreadPool(conf.getInt(
+ "bsp.checkpoint.serializer_thread", 10));
+ LOG.info(BSPMessageSerializer.class.getName()
+ + " is ready to serialize message.");
+ }
+
+ private Socket init(final int port) {
+ Socket tmp = null;
+ try {
+ tmp = new Socket("localhost", port);
+ } catch (UnknownHostException uhe) {
+ LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+ } catch (IOException ioe) {
+ LOG.warn("Fail to create socket.", ioe);
+ }
+ return tmp;
+ }
+
+ void serialize(final BSPSerializableMessage tmp) throws IOException {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+ final DataOutput out = new DataOutputStream(client.getOutputStream());
+ this.sched.schedule(new Callable<Object>() {
+ public Object call() throws Exception {
+ tmp.write(out);
+ return null;
+ }
+ }, 0, SECONDS);
+ }
+
+ public void close() {
+ try {
+ this.client.close();
+ this.sched.shutdown();
+ } catch (IOException io) {
+ LOG.error("Fail to close client socket.", io);
+ }
+ }
+
+ }// message serializer
+
+ /**
+ * Protected default constructor for LocalBSPRunner.
+ */
+ protected BSPPeer() {
+ bspRoot = null;
+ quorumServers = null;
+ messageSerializer = null;
+ conf = null;
+ }
+
+ /**
+ * BSPPeer Constructor.
+ *
+ * BSPPeer acts on behalf of clients performing bsp() tasks.
+ *
+ * @param conf is the configuration file containing bsp peer host, port, etc.
+ * @param umbilical is the bsp protocol used to contact its parent process.
+ * @param taskid is the id that current process holds.
+ */
+ public BSPPeer(Configuration conf, TaskAttemptID taskid,
+ BSPPeerProtocol umbilical) throws IOException {
+ this.conf = conf;
+ this.taskid = taskid;
+ this.umbilical = umbilical;
+
+ String bindAddress = conf.get(Constants.PEER_HOST,
+ Constants.DEFAULT_PEER_HOST);
+ int bindPort = conf
+ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+ Constants.DEFAULT_ZOOKEEPER_ROOT);
+ quorumServers = QuorumPeer.getZKQuorumServersString(conf);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Quorum " + quorumServers);
+ peerAddress = new InetSocketAddress(bindAddress, bindPort);
+ BSPMessageSerializer msgSerializer = null;
+ if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ msgSerializer = new BSPMessageSerializer(conf.getInt(
+ "bsp.checkpoint.port",
+ Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+ }
+ this.messageSerializer = msgSerializer;
+ }
+
+ public void reinitialize() {
+ try {
+ if (LOG.isDebugEnabled())
+ LOG.debug("reinitialize(): " + getPeerName());
+ this.server = RPC.getServer(this, peerAddress.getHostName(),
+ peerAddress.getPort(), conf);
+ server.start();
+ LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+ + peerAddress.getPort());
+ } catch (IOException e) {
+ LOG.error("Fail to start RPC server!", e);
+ }
+
+ try {
+ this.zk = new ZooKeeper(quorumServers, conf.getInt(
+ Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ } catch (IOException e) {
+ LOG.error("Fail while reinitializing zookeeeper!", e);
+ }
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localQueue.poll();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+ * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+ */
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ if (peerName.equals(getPeerName())) {
+ LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+ localQueueForNextIteration.add(msg);
+ } else {
+ LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+ ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+ .get(targetPeerAddress);
+ if (queue == null) {
+ queue = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+ queue.add(msg);
+ outgoingQueues.put(targetPeerAddress, queue);
+ }
+ }
+
+ private String checkpointedPath() {
+ String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+ String ckptPath = backup + jobConf.getJobID().toString() + "/"
+ + getSuperstepCount() + "/" + this.taskid.toString();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Messages are to be saved to " + ckptPath);
+ return ckptPath;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+ */
+ @Override
+ public void sync() throws IOException, KeeperException, InterruptedException {
+ enterBarrier();
+ Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+ .entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+ .next();
+
+ BSPPeerInterface peer = peers.get(entry.getKey());
+ if (peer == null) {
+ try {
+ peer = getBSPPeerConnection(entry.getKey());
+ } catch (NullPointerException ne) {
+ umbilical.fatalError(taskid, entry.getKey().getHostName()
+ + " doesn't exists.");
+ }
+ }
+ Iterable<BSPMessage> messages = entry.getValue();
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ for (BSPMessage message : messages) {
+ bundle.addMessage(message);
+ }
+
+ // checkpointing
+ if (null != this.messageSerializer) {
+ this.messageSerializer.serialize(new BSPSerializableMessage(
+ checkpointedPath(), bundle));
+ }
+
+ peer.put(bundle);
+ }
+
+ leaveBarrier();
+ currentTaskStatus.incrementSuperstepCount();
+ umbilical.statusUpdate(taskid, currentTaskStatus);
+
+ // Clear outgoing queues.
+ clearOutgoingQueues();
+
+ // Add non-processed messages from this iteration for the next's queue.
+ while (!localQueue.isEmpty()) {
+ BSPMessage message = localQueue.poll();
+ localQueueForNextIteration.add(message);
+ }
+ // Switch local queues.
+ localQueue = localQueueForNextIteration;
+ localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+
+ private void createZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.PERSISTENT);
+ }
+
+ private void createEphemeralZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.EPHEMERAL);
+ }
+
+ private void createZnode(final String path, final CreateMode mode)
+ throws KeeperException, InterruptedException {
+ synchronized (zk) {
+ Stat s = zk.exists(path, false);
+ if (null == s) {
+ try {
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch (KeeperException.NodeExistsException nee) {
+ LOG.warn("Ignore because znode may be already created at " + path,
+ nee);
+ }
+ }
+ }
+ }
+
+ private class BarrierWatcher implements Watcher {
+ private boolean complete = false;
+
+ boolean isComplete() {
+ return this.complete;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ this.complete = true;
+ synchronized (mutex) {
+ LOG.info(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " is notified.");
+ /*
+ * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+ * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } }
+ * catch(KeeperException.NoNodeException nne) {
+ * LOG.warn("Ignore because znode may be deleted.", nne); }
+ * catch(Exception e) { throw new RuntimeException(e); }
+ */
+ mutex.notifyAll();
+ }
+ }
+ }
+
+ protected boolean enterBarrier() throws KeeperException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+ + this.getSuperstepCount());
+ }
+
+ synchronized (zk) {
+ createZnode(bspRoot);
+ final String pathToJobIdZnode = bspRoot + "/"
+ + taskid.getJobID().toString();
+ createZnode(pathToJobIdZnode);
+ final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+ + getSuperstepCount();
+ createZnode(pathToSuperstepZnode);
+ BarrierWatcher barrierWatcher = new BarrierWatcher();
+ Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+ barrierWatcher);
+ zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ int size = znodes.size(); // may contains ready
+ boolean hasReady = znodes.contains("ready");
+ if (hasReady) {
+ size--;
+ }
+
+ LOG.debug("===> at superstep :" + getSuperstepCount()
+ + " current znode size: " + znodes.size() + " current znodes:"
+ + znodes);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+ + " is " + znodes.size() + ". Znodes include " + znodes);
+
+ if (size < jobConf.getNumBspTask()) {
+ LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
+ + " which task is waiting? " + taskid.toString()
+ + " stat is null? " + readyStat);
+ while (!barrierWatcher.isComplete()) {
+ if (!hasReady) {
+ synchronized (mutex) {
+ mutex.wait(1000);
+ }
+ }
+ }
+ LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
+ + " after waiting ..." + taskid.toString());
+ } else {
+ LOG.debug("---> at superstep: " + getSuperstepCount()
+ + " task that is creating /ready znode:" + taskid.toString());
+ createEphemeralZnode(pathToSuperstepZnode + "/ready");
+ }
+ }
+ return true;
+ }
+
+ protected boolean leaveBarrier() throws KeeperException, InterruptedException {
+ final String pathToSuperstepZnode = bspRoot + "/"
+ + taskid.getJobID().toString() + "/" + getSuperstepCount();
+ while (true) {
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:"
+ + getSuperstepCount() + " znode:" + znodes);
+ if (znodes.contains("ready")) {
+ znodes.remove("ready");
+ }
+ final int size = znodes.size();
+ LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+ + " znode size: (" + size + ") znodes:" + znodes);
+ if (null == znodes || znodes.isEmpty())
+ return true;
+ if (1 == size) {
+ try {
+ zk.delete(getNodeName(), 0);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn(
+ "+++ (znode size is 1). Ignore because znode may disconnect.",
+ nne);
+ }
+ return true;
+ }
+ Collections.sort(znodes);
+
+ final String lowest = znodes.get(0);
+ final String highest = znodes.get(size - 1);
+
+ LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+ + highest);
+ synchronized (mutex) {
+
+ if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
+ Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " highest notify lowest.");
+ mutex.notifyAll();
+ }
+ }
+ });
+
+ if (null != s) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for higest notify.");
+ mutex.wait();
+ }
+ } else {
+ Stat s1 = zk.exists(getNodeName(), false);
+
+ if (null != s1) {
+ LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " exists, so delete it.");
+ try {
+ zk.delete(getNodeName(), 0);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn("++++ Ignore because node may be dleted.", nne);
+ }
+ }
+
+ Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " lowest notify other nodes.");
+ mutex.notifyAll();
+ }
+ }
+ });
+ if (null != s2) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for lowest notify.");
+ mutex.wait();
+ }
+ }
+ }
+ }
+ }
+
+ private String getNodeName() {
+ return bspRoot + "/" + taskid.getJobID().toString() + "/"
+ + getSuperstepCount() + "/" + taskid.toString();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ mutex.notify();
+ }
+ }
+
+ public void clear() {
+ this.localQueue.clear();
+ this.outgoingQueues.clear();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.clear();
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (server != null)
+ server.stop();
+ if (null != messageSerializer)
+ this.messageSerializer.close();
+ }
+
+ @Override
+ public void put(BSPMessage msg) throws IOException {
+ this.localQueueForNextIteration.add(msg);
+ }
+
+ @Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ for (BSPMessage message : messages.getMessages()) {
+ this.localQueueForNextIteration.add(message);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return BSPPeerInterface.versionID;
+ }
+
+ protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr)
+ throws NullPointerException {
+ BSPPeerInterface peer;
+ synchronized (this.peers) {
+ peer = peers.get(addr);
+
+ if (peer == null) {
+ try {
+ peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class,
+ BSPPeerInterface.versionID, addr, this.conf);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ this.peers.put(addr, peer);
+ }
+ }
+
+ return peer;
+ }
+
+ /**
+ * @return the string as host:port of this Peer
+ */
+ public String getPeerName() {
+ return peerAddress.getHostName() + ":" + peerAddress.getPort();
+ }
+
+ private InetSocketAddress getAddress(String peerName) {
+ String[] peerAddrParts = peerName.split(":");
+ if (peerAddrParts.length != 2) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Peername must consist of exactly ONE \":\"! Given peername was: "
+ + peerName);
+ }
+ return new InetSocketAddress(peerAddrParts[0],
+ Integer.parseInt(peerAddrParts[1]));
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ String[] result = null;
+ try {
+ result = zk.getChildren("/" + jobConf.getJobID().toString(), this)
+ .toArray(new String[0]);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return result;
+ }
+
+ /**
+ * @return the number of messages
+ */
+ public int getNumCurrentMessages() {
+ return localQueue.size();
+ }
+
+ /**
+ * Sets the current status
+ *
+ * @param currentTaskStatus
+ */
+ public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+ this.currentTaskStatus = currentTaskStatus;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount() {
+ return currentTaskStatus.getSuperstepCount();
+ }
+
+ /**
+ * Sets the job configuration
+ *
+ * @param jobConf
+ */
+ public void setJobConf(BSPJob jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ /**
+ * @return the size of local queue
+ */
+ public int getLocalQueueSize() {
+ return localQueue.size();
+ }
+
+ /**
+ * @return the size of outgoing queue
+ */
+ public int getOutgoingQueueSize() {
+ return outgoingQueues.size();
+ }
+
+ /**
+ * Clears local queue
+ */
+ public void clearLocalQueue() {
+ this.localQueue.clear();
+ }
+
+ /**
+ * Clears outgoing queues
+ */
+ public void clearOutgoingQueues() {
+ this.outgoingQueues.clear();
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * BSP communication interface.
+ */
+public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable,
+ Constants {
+
+ /**
+ * Send a data with a tag to another BSPSlave corresponding to hostname.
+ * Messages sent by this method are not guaranteed to be received in a sent
+ * order.
+ *
+ * @param peerName
+ * @param msg
+ * @throws IOException
+ */
+ public void send(String peerName, BSPMessage msg) throws IOException;
+
+ /**
+ * Puts a message to local queue.
+ *
+ * @param msg
+ * @throws IOException
+ */
+ public void put(BSPMessage msg) throws IOException;
+
+ /**
+ * Puts a bundle of messages to local queue.
+ *
+ * @param messages
+ * @throws IOException
+ */
+ public void put(BSPMessageBundle messages) throws IOException;
+
+ /**
+ * @return A message from the peer's received messages queue (a FIFO).
+ * @throws IOException
+ */
+ public BSPMessage getCurrentMessage() throws IOException;
+
+ /**
+ * @return The number of messages in the peer's received messages queue.
+ */
+ public int getNumCurrentMessages();
+
+ /**
+ * Barrier Synchronization.
+ *
+ * Sends all the messages in the outgoing message queues to the corresponding
+ * remote peers.
+ *
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void sync() throws IOException, KeeperException, InterruptedException;
+
+ /**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount();
+
+ /**
+ * @return The name of this peer in the format "hostname:port".
+ */
+ public String getPeerName();
+
+ /**
+ * @return The names of all the peers executing tasks from the same job
+ * (including this peer).
+ */
+ public String[] getAllPeerNames();
+
+ /**
+ * Clears all queues entries.
+ */
+ public void clear();
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Base class for tasks.
+ */
+public class BSPTask extends Task {
+
+ public static final Log LOG = LogFactory.getLog(BSPTask.class);
+
+ private BSPJob conf;
+
+ public BSPTask() {
+ }
+
+ public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.taskId = taskid;
+ this.partition = partition;
+ }
+
+ @Override
+ public BSPTaskRunner createRunner(GroomServer groom) {
+ return new BSPTaskRunner(this, groom, this.conf);
+ }
+
+ @Override
+ public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+ throws IOException {
+
+ BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
+ "bsp.work.class", BSP.class), job.getConf());
+
+ try {
+ bsp.bsp(bspPeer);
+ } catch (IOException e) {
+ LOG.error("Exception during BSP execution!", e);
+ } catch (KeeperException e) {
+ LOG.error("Exception during BSP execution!", e);
+ } catch (InterruptedException e) {
+ LOG.error("Exception during BSP execution!", e);
+ }
+
+ done(umbilical);
+ }
+
+ public BSPJob getConf() {
+ return conf;
+ }
+
+ public void setConf(BSPJob conf) {
+ this.conf = conf;
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Base class that runs a task in a separate process.
+ */
+public class BSPTaskRunner extends TaskRunner {
+
+ public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
+
+ public BSPTaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+ super(bspTask, groom, conf);
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPTaskRunner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a boolean value.
+ */
+public class BooleanMessage extends BSPMessage {
+
+ String tag;
+ boolean data;
+
+ public BooleanMessage() {
+ super();
+ }
+
+ public BooleanMessage(String tag, boolean data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeBoolean(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readBoolean();
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Boolean getData() {
+ return data;
+ }
+}
\ No newline at end of file
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a byte tag and a byte data.
+ */
+public class ByteMessage extends BSPMessage {
+
+ private byte[] tag;
+ private byte[] data;
+
+ public ByteMessage() {
+ super();
+ }
+
+ public ByteMessage(byte[] tag, byte[] data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public byte[] getTag() {
+ return this.tag;
+ }
+
+ @Override
+ public byte[] getData() {
+ return this.data;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.tag = new byte[in.readInt()];
+ in.readFully(tag, 0, this.tag.length);
+ this.data = new byte[in.readInt()];
+ in.readFully(data, 0, this.data.length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(tag.length);
+ out.write(tag);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Status information on the current state of the BSP cluster.
+ *
+ * <p>
+ * <code>ClusterStatus</code> provides clients with information such as:
+ * <ol>
+ * <li>
+ * Size of the cluster.</li>
+ * <li>
+ * Name of the grooms.</li>
+ * <li>
+ * Task capacity of the cluster.</li>
+ * <li>
+ * The number of currently running bsp tasks.</li>
+ * <li>
+ * State of the <code>BSPMaster</code>.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * Clients can query for the latest <code>ClusterStatus</code>, via
+ * {@link BSPJobClient#getClusterStatus(boolean)}.
+ * </p>
+ *
+ * @see BSPMaster
+ */
+public class ClusterStatus implements Writable {
+
+ private int numActiveGrooms;
+ private Map<String, GroomServerStatus> activeGrooms = new HashMap<String, GroomServerStatus>();
+ private Map<String, String> cachedActiveGroomNames = null;
+ private int tasks;
+ private int maxTasks;
+ private BSPMaster.State state;
+
+ /**
+ *
+ */
+ public ClusterStatus() {
+ }
+
+ public ClusterStatus(int grooms, int tasks, int maxTasks,
+ BSPMaster.State state) {
+ this.numActiveGrooms = grooms;
+ this.tasks = tasks;
+ this.maxTasks = maxTasks;
+ this.state = state;
+ }
+
+ public ClusterStatus(Map<String, GroomServerStatus> activeGrooms, int tasks,
+ int maxTasks, BSPMaster.State state) {
+ this(activeGrooms.size(), tasks, maxTasks, state);
+ this.activeGrooms = activeGrooms;
+ }
+
+ /**
+ * Get the number of groom servers in the cluster.
+ *
+ * @return the number of groom servers in the cluster.
+ */
+ public int getGroomServers() {
+ return numActiveGrooms;
+ }
+
+ /**
+ * Get the names of groom servers, and their hostnames, in the cluster.
+ *
+ * @return the active groom servers in the cluster.
+ */
+ public Map<String, String> getActiveGroomNames() {
+ if (cachedActiveGroomNames == null) {
+ if (activeGrooms != null) {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Entry<String, GroomServerStatus> entry : activeGrooms.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().getGroomHostName());
+ }
+ cachedActiveGroomNames = map;
+ }
+ }
+ return cachedActiveGroomNames;
+ }
+
+ /**
+ * Get the names of groom servers, and their current status in the cluster.
+ *
+ * @return the active groom servers in the cluster.
+ */
+ public Map<String, GroomServerStatus> getActiveGroomServerStatus() {
+ return activeGrooms;
+ }
+
+ /**
+ * Get the number of currently running tasks in the cluster.
+ *
+ * @return the number of currently running tasks in the cluster.
+ */
+ public int getTasks() {
+ return tasks;
+ }
+
+ /**
+ * Get the maximum capacity for running tasks in the cluster.
+ *
+ * @return the maximum capacity for running tasks in the cluster.
+ */
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ /**
+ * Get the current state of the <code>BSPMaster</code>, as
+ * {@link BSPMaster.State}
+ *
+ * @return the current state of the <code>BSPMaster</code>.
+ */
+ public BSPMaster.State getBSPMasterState() {
+ return state;
+ }
+
+ // ////////////////////////////////////////////
+ // Writable
+ // ////////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (activeGrooms.isEmpty()) {
+ out.writeInt(numActiveGrooms);
+ out.writeBoolean(false);
+ } else {
+ out.writeInt(activeGrooms.size());
+ out.writeBoolean(true);
+
+ for (Entry<String, GroomServerStatus> entry : activeGrooms.entrySet()) {
+ out.writeUTF(entry.getKey());
+ entry.getValue().write(out);
+ }
+
+ }
+ out.writeInt(tasks);
+ out.writeInt(maxTasks);
+ WritableUtils.writeEnum(out, state);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ numActiveGrooms = in.readInt();
+ boolean groomListFollows = in.readBoolean();
+
+ if (groomListFollows) {
+ activeGrooms = new HashMap<String, GroomServerStatus>(numActiveGrooms);
+
+ for (int i = 0; i < numActiveGrooms; i++) {
+ final String groomName = in.readUTF();
+ final GroomServerStatus status = new GroomServerStatus();
+ status.readFields(in);
+ activeGrooms.put(groomName, status);
+ }
+ }
+
+ tasks = in.readInt();
+ maxTasks = in.readInt();
+ state = WritableUtils.readEnum(in, BSPMaster.State.class);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
+ * of the task.
+ */
+class CommitTaskAction extends GroomServerAction {
+ private TaskAttemptID taskId;
+
+ public CommitTaskAction() {
+ super(ActionType.COMMIT_TASK);
+ taskId = new TaskAttemptID();
+ }
+
+ public CommitTaskAction(TaskAttemptID taskId) {
+ super(ActionType.COMMIT_TASK);
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ */
+public class Directive implements Writable{
+
+ protected long timestamp;
+ protected Directive.Type type;
+
+ public static enum Type {
+ Request(1), Response(2);
+ int t;
+
+ Type(int t) {
+ this.t = t;
+ }
+
+ public int value() {
+ return this.t;
+ }
+ };
+
+ public Directive(){}
+
+ public Directive(Directive.Type type) {
+ this.timestamp = System.currentTimeMillis();
+ this.type = type;
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public Directive.Type getType() {
+ return this.type;
+ }
+
+ /**
+ * Command for BSPMaster or GroomServer to execute.
+ public abstract void execute() throws Exception;
+ */
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.timestamp);
+ out.writeInt(this.type.value());
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.timestamp = in.readLong();
+ int t = in.readInt();
+ if (Directive.Type.Request.value() == t) {
+ this.type = Directive.Type.Request;
+ }else{
+ this.type = Directive.Type.Response;
+ }
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Directive.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A custom exception class for Directive.
+ */
+public class DirectiveException extends RuntimeException{
+ private static final long serialVersionUID = -8052582046894492822L;
+
+ public DirectiveException(){
+ super();
+ }
+
+ public DirectiveException(String message){
+ super(message);
+ }
+
+ public DirectiveException(String message, Throwable t){
+ super(message, t);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A DirectiveHandler interface.
+ */
+public interface DirectiveHandler{
+
+ /**
+ * Handle directives on demand.
+ * @param directive to be handled.
+ */
+ void handle(Directive directive) throws DirectiveException;
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Handles the tasks dispatching between the BSPMaster and the GroomServers.
+ */
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+ public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+ private GroomServerAction[] actions;
+
+ public DispatchTasksDirective() {
+ super();
+ }
+
+ public DispatchTasksDirective(GroomServerAction[] actions) {
+ super(Directive.Type.Request);
+ this.actions = actions;
+ }
+
+ public GroomServerAction[] getActions() {
+ return this.actions;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ if (this.actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for (GroomServerAction action : this.actions) {
+ WritableUtils.writeEnum(out, action.getActionType());
+ action.write(out);
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ this.actions = new GroomServerAction[length];
+ for (int i = 0; i < length; ++i) {
+ GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+ GroomServerAction.ActionType.class);
+ actions[i] = GroomServerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ this.actions = null;
+ }
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a double data.
+ */
+public class DoubleMessage extends BSPMessage {
+
+ private String tag;
+ private double data;
+
+ public DoubleMessage() {
+ super();
+ }
+
+ public DoubleMessage(String tag, double data) {
+ super();
+ this.data = data;
+ this.tag = tag;
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Double getData() {
+ return data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeDouble(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readDouble();
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+ public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+ private final String name;
+ private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+ public FCFSQueue(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void addJob(JobInProgress job) {
+ try {
+ queue.put(job);
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+ }
+ }
+
+ @Override
+ public void removeJob(JobInProgress job) {
+ queue.remove(job);
+ }
+
+ @Override
+ public JobInProgress removeJob() {
+ try {
+ return queue.take();
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<JobInProgress> jobs() {
+ return queue;
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
------------------------------------------------------------------------------
svn:eol-style = native