You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/27 11:59:00 UTC
svn commit: r1075021 - in /incubator/hama/trunk/src: java/org/apache/hama/
java/org/apache/hama/bsp/ java/org/apache/hama/util/
java/org/apache/hama/zookeeper/ test/org/apache/hama/bsp/
Author: edwardyoon
Date: Sun Feb 27 10:58:58 2011
New Revision: 1075021
URL: http://svn.apache.org/viewvc?rev=1075021&view=rev
Log:
Add javadoc comments.
Modified:
incubator/hama/trunk/src/java/org/apache/hama/BSPMasterRunner.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/GroomServerRunner.java
incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java
incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
incubator/hama/trunk/src/java/org/apache/hama/ZooKeeperRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java
incubator/hama/trunk/src/java/org/apache/hama/util/VersionInfo.java
incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java
incubator/hama/trunk/src/java/org/apache/hama/zookeeper/ZKServerTool.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/BSPMasterRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/BSPMasterRunner.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/BSPMasterRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/BSPMasterRunner.java Sun Feb 27 10:58:58 2011
@@ -26,7 +26,7 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hama.bsp.BSPMaster;
/**
- * This class starts and runs the BSPMaster
+ * This class starts and runs the BSPMaster.
*/
public class BSPMasterRunner extends Configured implements Tool {
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Feb 27 10:58:58 2011
@@ -20,7 +20,7 @@
package org.apache.hama;
/**
- * Some constants used in the Hama
+ * Some constants used in the Hama.
*/
public interface Constants {
Modified: incubator/hama/trunk/src/java/org/apache/hama/GroomServerRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/GroomServerRunner.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/GroomServerRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/GroomServerRunner.java Sun Feb 27 10:58:58 2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hama.bsp.GroomServer;
/**
- * This class starts and runs the GroomServer
+ * This class starts and runs the GroomServer.
*/
public class GroomServerRunner extends Configured implements Tool {
Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java Sun Feb 27 10:58:58 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
/**
- * Adds Hama configuration files to a Configuration
+ * Adds Hama configuration files to a Configuration.
*/
public class HamaConfiguration extends Configuration {
/** constructor */
Modified: incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java Sun Feb 27 10:58:58 2011
@@ -34,6 +34,9 @@ import org.apache.zookeeper.server.NIOSe
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
+/**
+ * This class starts and runs the MiniZookeeperCluster.
+ */
public class MiniZooKeeperCluster {
private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/ZooKeeperRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ZooKeeperRunner.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ZooKeeperRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ZooKeeperRunner.java Sun Feb 27 10:58:58 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hama.zookeeper.QuorumPeer;
/**
- * This class starts and runs the ZooKeeperServer
+ * This class starts and runs the ZooKeeperServer.
*/
public class ZooKeeperRunner extends Configured implements Tool {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Sun Feb 27 10:58:58 2011
@@ -18,7 +18,7 @@
package org.apache.hama.bsp;
/**
- * This class provides an abstract implementation of the BSP interface
+ * This class provides an abstract implementation of the BSP interface.
*/
public abstract class BSP implements BSPInterface {
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Sun Feb 27 10:58:58 2011
@@ -25,6 +25,11 @@ import java.util.Enumeration;
import org.apache.hadoop.fs.Path;
import org.apache.hama.HamaConfiguration;
+/**
+ * A BSP job configuration.
+ *
+ * BSPJob is the primary interface for a user to describe a BSP job to the Hama BSP framework for execution.
+ */
public class BSPJob extends BSPJobContext {
public static enum JobState {
DEFINE, RUNNING
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Sun Feb 27 10:58:58 2011
@@ -40,6 +40,13 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.JobSubmissionProtocol;
+/**
+ * BSPJobClient is the primary interface for the user-job to interact with the
+ * BSPMaster.
+ *
+ * BSPJobClient provides facilities to submit jobs, track their progress, access
+ * component-tasks' reports/logs, get the BSP cluster status information etc.
+ */
public class BSPJobClient extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
@@ -191,9 +198,9 @@ public class BSPJobClient extends Config
public void init(Configuration conf) throws IOException {
this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
- JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
- BSPMaster.getAddress(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, BSPMaster
+ .getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
}
/**
@@ -330,8 +337,8 @@ public class BSPJobClient extends Config
//
// Now, actually submit the job (using the submit name)
//
- JobStatus status = jobSubmitClient.submitJob(jobId,
- submitJobFile.toString());
+ JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile
+ .toString());
if (status != null) {
return new NetworkedJob(status);
} else {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java Sun Feb 27 10:58:58 2011
@@ -25,6 +25,9 @@ import java.text.NumberFormat;
import org.apache.hadoop.io.Text;
+/**
+ * BSPJobID represents the immutable and unique identifier for the job.
+ */
public class BSPJobID extends ID implements Comparable<ID> {
protected static final String JOB = "job";
private final Text jtIdentifier;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Sun Feb 27 10:58:58 2011
@@ -23,6 +23,10 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
+/**
+ * BSPMessage consists of the tag and the arbitrary amount of data to be
+ * communicated.
+ */
public class BSPMessage implements Writable {
protected byte[] tag;
protected byte[] data;
@@ -31,7 +35,7 @@ public class BSPMessage implements Writa
}
/**
- * Constructor
+ * Constructor
*
* @param tag of data
* @param data of message
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sun Feb 27 10:58:58 2011
@@ -23,9 +23,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -39,10 +39,13 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
+/**
+ *
+ */
public class BSPPeer implements Watcher, BSPPeerInterface {
public static final Log LOG = LogFactory.getLog(BSPPeer.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Sun Feb 27 10:58:58 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hama.Constants;
import org.apache.zookeeper.KeeperException;
+/**
+ * BSP communication interface.
+ */
public interface BSPPeerInterface extends BSPRPCProtocolVersion, Closeable,
Constants {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java Sun Feb 27 10:58:58 2011
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
import java.io.IOException;
/**
+ * Protocol that task child process uses to contact its parent process.
*/
public interface BSPPeerProtocol extends BSPPeerInterface {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Sun Feb 27 10:58:58 2011
@@ -22,6 +22,9 @@ import java.io.IOException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.zookeeper.KeeperException;
+/**
+ * Base class for tasks.
+ */
public class BSPTask extends Task {
private BSPJob conf;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Sun Feb 27 10:58:58 2011
@@ -20,6 +20,9 @@ package org.apache.hama.bsp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+/**
+ * Base class that runs a task in a separate process.
+ */
public class BSPTaskRunner extends TaskRunner {
public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Sun Feb 27 10:58:58 2011
@@ -59,6 +59,14 @@ import org.apache.hama.ipc.WorkerProtoco
import org.apache.log4j.LogManager;
import org.apache.zookeeper.KeeperException;
+/**
+ * A Groom Server (shortly referred to as groom) is a process that performs bsp
+ * tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes
+ * assigned tasks and reports its status by means of periodical piggybacks with
+ * BSPMaster. Each groom is designed to run with HDFS or other distributed
+ * storages. Basically, a groom server and a data node should be run on one
+ * physical node.
+ */
public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
@@ -133,9 +141,8 @@ public class GroomServer implements Runn
}
if (localHostname == null) {
- this.localHostname = DNS.getDefaultHost(
- conf.get("bsp.dns.interface", "default"),
- conf.get("bsp.dns.nameserver", "default"));
+ this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+ "default"), conf.get("bsp.dns.nameserver", "default"));
}
// check local disk
checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -301,7 +308,7 @@ public class GroomServer implements Runn
Thread.sleep(REPORT_INTERVAL);
} catch (InterruptedException ie) {
}
-
+
try {
if (justInited) {
String dir = masterClient.getSystemDir();
@@ -628,8 +635,8 @@ public class GroomServer implements Runn
* Update and report refresh status back to BSPMaster.
*/
private void doReport(TaskStatus taskStatus) {
- GroomServerStatus gss = new GroomServerStatus(groomServerName,
- bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
+ GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
+ .getPeerName(), updateTaskStatus(taskStatus), failures,
maxCurrentTasks, rpcServer);
try {
boolean ret = masterClient.report(new Directive(gss));
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Sun Feb 27 10:58:58 2011
@@ -33,33 +33,36 @@ import org.apache.hadoop.io.WritableFact
import org.apache.hadoop.io.WritableFactory;
/**
- *
+ * A GroomServerStatus is a BSP primitive. Keeps info on a BSPMaster. The
+ * BSPMaster maintains a set of the most recent GroomServerStatus objects for
+ * each unique GroomServer it knows about.
*/
public class GroomServerStatus implements Writable {
public static final Log LOG = LogFactory.getLog(GroomServerStatus.class);
-
+
static {
- WritableFactories.setFactory
- (GroomServerStatus.class,
- new WritableFactory() {
- public Writable newInstance() { return new GroomServerStatus(); }
- });
+ WritableFactories.setFactory(GroomServerStatus.class,
+ new WritableFactory() {
+ public Writable newInstance() {
+ return new GroomServerStatus();
+ }
+ });
}
-
+
String groomName;
String peerName;
String rpcServer;
int failures;
List<TaskStatus> taskReports;
-
+
volatile long lastSeen;
private int maxTasks;
public GroomServerStatus() {
- //taskReports = new ArrayList<TaskStatus>();
+ // taskReports = new ArrayList<TaskStatus>();
taskReports = new CopyOnWriteArrayList<TaskStatus>();
}
-
+
public GroomServerStatus(String groomName, String peerName,
List<TaskStatus> taskReports, int failures, int maxTasks) {
this(groomName, peerName, taskReports, failures, maxTasks, "");
@@ -73,44 +76,44 @@ public class GroomServerStatus implement
this.failures = failures;
this.maxTasks = maxTasks;
this.rpcServer = rpc;
- }
+ }
public String getGroomName() {
return groomName;
}
-
+
/**
* The host (and port) from where the groom server can be reached.
- *
+ *
* @return The groom server address in the form of "hostname:port"
*/
public String getPeerName() {
return peerName;
}
- public String getRpcServer(){
+ public String getRpcServer() {
return rpcServer;
}
-
+
/**
- * Get the current tasks at the GroomServer.
- * Tasks are tracked by a {@link TaskStatus} object.
+ * Get the current tasks at the GroomServer. Tasks are tracked by a
+ * {@link TaskStatus} object.
*
- * @return a list of {@link TaskStatus} representing
- * the current tasks at the GroomServer.
+ * @return a list of {@link TaskStatus} representing the current tasks at the
+ * GroomServer.
*/
public List<TaskStatus> getTaskReports() {
return taskReports;
}
-
+
public int getFailures() {
return failures;
- }
-
+ }
+
public long getLastSeen() {
return lastSeen;
}
-
+
public void setLastSeen(long lastSeen) {
this.lastSeen = lastSeen;
}
@@ -118,7 +121,7 @@ public class GroomServerStatus implement
public int getMaxTasks() {
return maxTasks;
}
-
+
/**
* Return the current MapTask count
*/
@@ -127,61 +130,60 @@ public class GroomServerStatus implement
for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
TaskStatus ts = it.next();
TaskStatus.State state = ts.getRunState();
- if(state == TaskStatus.State.RUNNING ||
- state == TaskStatus.State.UNASSIGNED) {
+ if (state == TaskStatus.State.RUNNING
+ || state == TaskStatus.State.UNASSIGNED) {
taskCount++;
- }
- }
-
- return taskCount;
+ }
+ }
+
+ return taskCount;
}
/**
- * For BSPMaster to distinguish between
- * different GroomServers, because
- * BSPMaster stores using GroomServerStatus
- * as key.
- */
+ * For BSPMaster to distinguish between different GroomServers, because
+ * BSPMaster stores using GroomServerStatus as key.
+ */
@Override
- public int hashCode(){
+ public int hashCode() {
int result = 17;
- result = 37*result + groomName.hashCode();
- result = 37*result + peerName.hashCode();
- result = 37*result + rpcServer.hashCode();
+ result = 37 * result + groomName.hashCode();
+ result = 37 * result + peerName.hashCode();
+ result = 37 * result + rpcServer.hashCode();
/*
- result = 37*result + (int)failures;
- result = 37*result + taskReports.hashCode();
- result = 37*result + (int)(lastSeen^(lastSeen>>>32));
- result = 37*result + (int)maxTasks;
- */
+ * result = 37*result + (int)failures; result = 37*result +
+ * taskReports.hashCode(); result = 37*result +
+ * (int)(lastSeen^(lastSeen>>>32)); result = 37*result + (int)maxTasks;
+ */
return result;
}
@Override
- public boolean equals(Object o){
- if (o == this) return true;
- if (null == o) return false;
- if (getClass() != o.getClass()) return false;
+ public boolean equals(Object o) {
+ if (o == this)
+ return true;
+ if (null == o)
+ return false;
+ if (getClass() != o.getClass())
+ return false;
GroomServerStatus s = (GroomServerStatus) o;
- if(!s.groomName.equals(groomName)) return false;
- if(!s.peerName.equals(peerName)) return false;
- if(!s.rpcServer.equals(rpcServer)) return false;
- /*
- if(s.failures != failures) return false;
- if(null == s.taskReports){
- if(null != s.taskReports)
- return false;
- }else if(!s.taskReports.equals(taskReports)){
+ if (!s.groomName.equals(groomName))
return false;
- }
- if(s.lastSeen != lastSeen) return false;
- if(s.maxTasks != maxTasks) return false;
- */
+ if (!s.peerName.equals(peerName))
+ return false;
+ if (!s.rpcServer.equals(rpcServer))
+ return false;
+ /*
+ * if(s.failures != failures) return false; if(null == s.taskReports){
+ * if(null != s.taskReports) return false; }else
+ * if(!s.taskReports.equals(taskReports)){ return false; } if(s.lastSeen !=
+ * lastSeen) return false; if(s.maxTasks != maxTasks) return false;
+ */
return true;
}
-
- /* (non-Javadoc)
+
+ /*
+ * (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@Override
@@ -193,16 +195,17 @@ public class GroomServerStatus implement
this.maxTasks = in.readInt();
taskReports.clear();
int numTasks = in.readInt();
-
+
TaskStatus status;
for (int i = 0; i < numTasks; i++) {
status = new TaskStatus();
status.readFields(in);
taskReports.add(status);
- }
+ }
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
@@ -213,7 +216,7 @@ public class GroomServerStatus implement
out.writeInt(failures);
out.writeInt(maxTasks);
out.writeInt(taskReports.size());
- for(TaskStatus taskStatus : taskReports) {
+ for (TaskStatus taskStatus : taskReports) {
taskStatus.write(out);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java Sun Feb 27 10:58:58 2011
@@ -30,6 +30,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+/**
+ * A HeartbeatReponse class.
+ */
public class HeartbeatResponse implements Writable, Configurable {
private Configuration conf;
short responseId;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java Sun Feb 27 10:58:58 2011
@@ -23,6 +23,11 @@ import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
+/**
+ * A general identifier, which internally stores the id as an integer. This is
+ * the super class of {@link BSPJobID}, {@link TaskID} and {@link TaskAttemptID}
+ * .
+ */
public abstract class ID implements WritableComparable<ID> {
protected static final char SEPARATOR = '_';
protected int id;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Sun Feb 27 10:58:58 2011
@@ -28,6 +28,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+/**
+ * Describes the current status of a job.
+ */
public class JobStatus implements Writable, Cloneable {
public static final Log LOG = LogFactory.getLog(JobStatus.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java Sun Feb 27 10:58:58 2011
@@ -19,6 +19,11 @@ package org.apache.hama.bsp;
import java.util.Collection;
+/**
+ * Job Queue interface.
+ *
+ * @param <T>
+ */
public interface Queue<T>{
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java Sun Feb 27 10:58:58 2011
@@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHa
import org.apache.hadoop.conf.Configuration;
+/**
+ * A BSPJob Queue Manager.
+ */
public class QueueManager{
private ConcurrentMap<String, Queue<JobInProgress>> queues =
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Sun Feb 27 10:58:58 2011
@@ -28,6 +28,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.ipc.WorkerProtocol;
+/**
+ * A simple task scheduler.
+ */
class SimpleTaskScheduler extends TaskScheduler {
private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Sun Feb 27 10:58:58 2011
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+/**
+ * Base class for tasks.
+ */
public abstract class Task implements Writable {
public static final Log LOG = LogFactory.getLog(Task.class);
// //////////////////////////////////////////
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Sun Feb 27 10:58:58 2011
@@ -36,9 +36,8 @@ public class TaskAttemptID extends ID {
this.taskId = taskId;
}
- public TaskAttemptID(String jtIdentifier, int jobId, boolean isMatrixTask,
- int taskId, int id) {
- this(new TaskID(jtIdentifier, jobId, isMatrixTask, taskId), id);
+ public TaskAttemptID(String jtIdentifier, int jobId, int taskId, int id) {
+ this(new TaskID(jtIdentifier, jobId, taskId), id);
}
public TaskAttemptID() {
@@ -48,7 +47,7 @@ public class TaskAttemptID extends ID {
public BSPJobID getJobID() {
return taskId.getJobID();
}
-
+
public TaskID getTaskID() {
return taskId;
}
@@ -104,19 +103,10 @@ public class TaskAttemptID extends ID {
return null;
try {
String[] parts = str.split(Character.toString(SEPARATOR));
- if (parts.length == 6) {
+ if (parts.length == 5) {
if (parts[0].equals(ATTEMPT)) {
- boolean isMatrixTask = false;
- if (parts[3].equals("m"))
- isMatrixTask = true;
- else if (parts[3].equals("g"))
- isMatrixTask = false;
- else
- throw new Exception();
-
return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
- isMatrixTask, Integer.parseInt(parts[4]), Integer
- .parseInt(parts[5]));
+ Integer.parseInt(parts[3]), Integer.parseInt(parts[4]));
}
}
} catch (Exception ex) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java Sun Feb 27 10:58:58 2011
@@ -22,6 +22,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.text.NumberFormat;
+/**
+ * TaskID represents the immutable and unique identifier for a BSP Task.
+ */
public class TaskID extends ID {
protected static final String TASK = "task";
protected static final NumberFormat idFormat = NumberFormat.getInstance();
@@ -31,19 +34,17 @@ public class TaskID extends ID {
}
private BSPJobID jobId;
- private boolean isMatrixTask;
- public TaskID(BSPJobID jobId, boolean isMatrixTask, int id) {
+ public TaskID(BSPJobID jobId, int id) {
super(id);
if (jobId == null) {
throw new IllegalArgumentException("jobId cannot be null");
}
this.jobId = jobId;
- this.isMatrixTask = isMatrixTask;
}
- public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
- this(new BSPJobID(jtIdentifier, jobId), isGraphTask, id);
+ public TaskID(String jtIdentifier, int jobId, int id) {
+ this(new BSPJobID(jtIdentifier, jobId), id);
}
public TaskID() {
@@ -55,18 +56,13 @@ public class TaskID extends ID {
return jobId;
}
- public boolean isGraphTask() {
- return isMatrixTask;
- }
-
@Override
public boolean equals(Object o) {
if (!super.equals(o))
return false;
TaskID that = (TaskID) o;
- return this.isMatrixTask == that.isMatrixTask
- && this.jobId.equals(that.jobId);
+ return this.jobId.equals(that.jobId);
}
@Override
@@ -74,10 +70,7 @@ public class TaskID extends ID {
TaskID that = (TaskID) o;
int jobComp = this.jobId.compareTo(that.jobId);
if (jobComp == 0) {
- if (this.isMatrixTask == that.isMatrixTask) {
- return this.id - that.id;
- } else
- return this.isMatrixTask ? -1 : 1;
+ return this.id - that.id;
} else {
return jobComp;
}
@@ -89,8 +82,8 @@ public class TaskID extends ID {
}
protected StringBuilder appendTo(StringBuilder builder) {
- return jobId.appendTo(builder).append(SEPARATOR).append(
- isMatrixTask ? 'm' : 'g').append(SEPARATOR).append(idFormat.format(id));
+ return jobId.appendTo(builder).append(SEPARATOR)
+ .append(idFormat.format(id));
}
@Override
@@ -102,14 +95,12 @@ public class TaskID extends ID {
public void readFields(DataInput in) throws IOException {
super.readFields(in);
jobId.readFields(in);
- isMatrixTask = in.readBoolean();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
jobId.write(out);
- out.writeBoolean(isMatrixTask);
}
public static TaskID forName(String str) throws IllegalArgumentException {
@@ -119,15 +110,8 @@ public class TaskID extends ID {
String[] parts = str.split("_");
if (parts.length == 5) {
if (parts[0].equals(TASK)) {
- boolean isMatrixTask = false;
- if (parts[3].equals("m"))
- isMatrixTask = true;
- else if (parts[3].equals("g"))
- isMatrixTask = false;
- else
- throw new Exception();
- return new TaskID(parts[1], Integer.parseInt(parts[2]), isMatrixTask,
- Integer.parseInt(parts[4]));
+ return new TaskID(parts[1], Integer.parseInt(parts[2]), Integer
+ .parseInt(parts[4]));
}
}
} catch (Exception ex) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Sun Feb 27 10:58:58 2011
@@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapred.JobStatus;
/**
- *
+ *TaskInProgress maintains all the info needed for a Task in the lifetime of
+ * its owning Job.
*/
class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
@@ -86,7 +87,7 @@ class TaskInProgress {
this.jobFile = jobFile;
this.partition = partition;
- this.id = new TaskID(jobId, true, partition);
+ this.id = new TaskID(jobId, partition);
}
public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
@@ -98,7 +99,7 @@ class TaskInProgress {
this.conf = conf;
this.partition = partition;
- this.id = new TaskID(jobId, true, partition);
+ this.id = new TaskID(jobId, partition);
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java Sun Feb 27 10:58:58 2011
@@ -29,6 +29,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.RunJar;
+/**
+ * Base class that runs a task in a separate process.
+ */
public class TaskRunner extends Thread {
public static final Log LOG = LogFactory.getLog(TaskRunner.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Sun Feb 27 10:58:58 2011
@@ -27,6 +27,11 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+/**
+ * Describes the current status of a task. This is not intended to be a
+ * comprehensive piece of data.
+ *
+ */
class TaskStatus implements Writable, Cloneable {
static final Log LOG = LogFactory.getLog(TaskStatus.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java Sun Feb 27 10:58:58 2011
@@ -37,6 +37,9 @@ import java.util.jar.Manifest;
import org.apache.hadoop.fs.FileUtil;
+/**
+ * Run a Hama job jar.
+ */
public class RunJar {
/** Unpack a jar file into a directory. */
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/VersionInfo.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/VersionInfo.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/VersionInfo.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/VersionInfo.java Sun Feb 27 10:58:58 2011
@@ -17,6 +17,9 @@
*/
package org.apache.hama.util;
+/**
+ * A version information class.
+ */
public class VersionInfo {
public static void main(String[] args) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java Sun Feb 27 10:58:58 2011
@@ -42,6 +42,9 @@ import org.apache.zookeeper.server.ZooKe
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+/**
+ * This class starts and runs the QuorumPeers
+ */
public class QuorumPeer implements Constants {
private static final Log LOG = LogFactory.getLog(QuorumPeer.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/zookeeper/ZKServerTool.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/zookeeper/ZKServerTool.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/zookeeper/ZKServerTool.java Sun Feb 27 10:58:58 2011
@@ -23,7 +23,11 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
+/**
+ * A tool class for Zookeeper use.
+ */
public class ZKServerTool {
+
/**
* Run the tool.
*
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java?rev=1075021&r1=1075020&r2=1075021&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java Sun Feb 27 10:58:58 2011
@@ -30,14 +30,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
public class TestDirective extends HamaCluster {
- private Log LOG = LogFactory.getLog(TestDirective.class);
-
HamaConfiguration conf;
public TestDirective() {
@@ -47,12 +43,12 @@ public class TestDirective extends HamaC
public void setUp() throws Exception {
}
- public void testRequest() throws Exception{
+ public void testRequest() throws Exception {
BSPJobID jobId = new BSPJobID();
- TaskID taskId = new TaskID(jobId, true, 0);
+ TaskID taskId = new TaskID(jobId, 0);
TaskAttemptID attemptId = new TaskAttemptID(taskId, 0);
- GroomServerAction[] actions = new GroomServerAction[]{
- new LaunchTaskAction(new BSPTask(jobId, "/path/to/jobFile", attemptId,0))};
+ GroomServerAction[] actions = new GroomServerAction[] { new LaunchTaskAction(
+ new BSPTask(jobId, "/path/to/jobFile", attemptId, 0)) };
Map<String, String> groomServerPeers = new HashMap<String, String>();
groomServerPeers.put("groomServer1", "192.168.0.22:8080");
groomServerPeers.put("groomServer2", "192.168.0.23:8081");
@@ -63,81 +59,73 @@ public class TestDirective extends HamaC
Directive w = new Directive(groomServerPeers, actions);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
- assertEquals("Check if data is serialized to output stream.",
- bout.size(), 0);
+ assertEquals("Check if data is serialized to output stream.", bout.size(),
+ 0);
DataOutput out = new DataOutputStream(bout);
w.write(out);
-
- assertTrue("Check if data is serialized to output stream.",
- bout.size() > 0 );
-
+
+ assertTrue("Check if data is serialized to output stream.", bout.size() > 0);
+
byte[] bytes = bout.toByteArray();
ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
DataInput in = new DataInputStream(bin);
-
+
Directive r = new Directive();
r.readFields(in);
- assertEquals("Check the reqeust type.", Directive.Type.Request.value(),
- r.getType().value());
+ assertEquals("Check the reqeust type.", Directive.Type.Request.value(), r
+ .getType().value());
Map<String, String> peers = r.getGroomServerPeers();
- assertEquals("Check groom server peers content.",
- peers.get("groomServer1"),
- groomServerPeers.get("groomServer1"));
- assertEquals("Check groom server peers content.",
- peers.get("groomServer2"),
- groomServerPeers.get("groomServer2"));
- assertEquals("Check groom server peers content.",
- peers.get("groomServer3"),
- groomServerPeers.get("groomServer3"));
- assertEquals("Check groom server peers content.",
- peers.get("groomServer4"),
- groomServerPeers.get("groomServer4"));
- assertEquals("Check groom server peers content.",
- peers.get("groomServer5"),
- groomServerPeers.get("groomServer5"));
+ assertEquals("Check groom server peers content.",
+ peers.get("groomServer1"), groomServerPeers.get("groomServer1"));
+ assertEquals("Check groom server peers content.",
+ peers.get("groomServer2"), groomServerPeers.get("groomServer2"));
+ assertEquals("Check groom server peers content.",
+ peers.get("groomServer3"), groomServerPeers.get("groomServer3"));
+ assertEquals("Check groom server peers content.",
+ peers.get("groomServer4"), groomServerPeers.get("groomServer4"));
+ assertEquals("Check groom server peers content.",
+ peers.get("groomServer5"), groomServerPeers.get("groomServer5"));
GroomServerAction[] as = r.getActions();
assertEquals("Check GroomServerAction size.", as.length, actions.length);
- assertTrue("Check GroomServerAction type.",
- as[0] instanceof LaunchTaskAction);
+ assertTrue("Check GroomServerAction type.",
+ as[0] instanceof LaunchTaskAction);
- Task t = ((LaunchTaskAction)as[0]).getTask();
+ Task t = ((LaunchTaskAction) as[0]).getTask();
assertEquals("Check action's bsp job id.", t.getJobID(), jobId);
- assertEquals("Check action's job file.",
- t.getJobFile(), "/path/to/jobFile");
+ assertEquals("Check action's job file.", t.getJobFile(), "/path/to/jobFile");
assertEquals("Check action's partition.", t.getPartition(), 0);
}
- public void testResponse() throws Exception{
+ public void testResponse() throws Exception {
BSPJobID jobId = new BSPJobID();
- TaskID taskId = new TaskID(jobId, true, 0);
+ TaskID taskId = new TaskID(jobId, 0);
TaskAttemptID attemptId = new TaskAttemptID(taskId, 0);
List<TaskStatus> tasks = new ArrayList<TaskStatus>();
- tasks.add(new TaskStatus(jobId, attemptId, 1f,
- TaskStatus.State.SUCCEEDED, "", "groomServer1",
- TaskStatus.Phase.CLEANUP));
- GroomServerStatus status = new GroomServerStatus("groomServer1",
- "192.168.1.111:2123", tasks, 1, 4);
+ tasks.add(new TaskStatus(jobId, attemptId, 1f, TaskStatus.State.SUCCEEDED,
+ "", "groomServer1", TaskStatus.Phase.CLEANUP));
+ GroomServerStatus status = new GroomServerStatus("groomServer1",
+ "192.168.1.111:2123", tasks, 1, 4);
Directive w = new Directive(status);
- assertEquals("Check directive type is correct.",
- w.getType().value(), Directive.Type.Response.value());
+ assertEquals("Check directive type is correct.", w.getType().value(),
+ Directive.Type.Response.value());
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(bout);
w.write(out);
- assertTrue("Check if output has data.", bout.size() > 0);
+ assertTrue("Check if output has data.", bout.size() > 0);
byte[] bytes = bout.toByteArray();
@@ -147,26 +135,26 @@ public class TestDirective extends HamaC
Directive r = new Directive();
r.readFields(in);
- assertEquals("Check directive type is correct.",
- r.getType().value(), w.getType().value());
+ assertEquals("Check directive type is correct.", r.getType().value(), w
+ .getType().value());
- assertEquals("Check groom server status' groom name.",
- r.getStatus().getGroomName(), "groomServer1");
-
- assertEquals("Check groom server status' peer name.",
- r.getStatus().getPeerName(), "192.168.1.111:2123");
+ assertEquals("Check groom server status' groom name.", r.getStatus()
+ .getGroomName(), "groomServer1");
+
+ assertEquals("Check groom server status' peer name.", r.getStatus()
+ .getPeerName(), "192.168.1.111:2123");
TaskStatus t = r.getStatus().getTaskReports().get(0);
assertEquals("Check tasks status' job id.", t.getJobId(), jobId);
-
- assertEquals("Check task status' run state.",
- t.getRunState(), TaskStatus.State.SUCCEEDED);
-
- assertEquals("Check task status' state.",
- t.getPhase(), TaskStatus.Phase.CLEANUP);
-
+
+ assertEquals("Check task status' run state.", t.getRunState(),
+ TaskStatus.State.SUCCEEDED);
+
+ assertEquals("Check task status' state.", t.getPhase(),
+ TaskStatus.Phase.CLEANUP);
+
}
- public void tearDown() throws Exception{
+ public void tearDown() throws Exception {
}
}