You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/11/09 07:43:09 UTC
svn commit: r1032869 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Nov 9 06:43:08 2010
New Revision: 1032869
URL: http://svn.apache.org/viewvc?rev=1032869&view=rev
Log:
Add superstep counter
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Nov 9 06:43:08 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-282: Add superstep counter (Filipe Manana via edwardyoon)
HAMA-327: Add overview for javadoc (edwardyoon)
HAMA-325: Add package.html to each package and few comments (edwardyoon)
HAMA-322: JUnit test threads with GroboTestingJUnit (Filipe Manana via edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Nov 9 06:43:08 2010
@@ -130,6 +130,11 @@ public class BSPJobClient extends Config
return status.getRunState() == JobStatus.SUCCEEDED;
}
+ public synchronized long getSuperstepCount() throws IOException {
+ ensureFreshStatus();
+ return status.getSuperstepCount();
+ }
+
/**
* Blocks until the job is finished
*/
@@ -385,22 +390,46 @@ public class BSPJobClient extends Config
IOException {
BSPJobClient jc = new BSPJobClient(job.getConf());
RunningJob running = jc.submitJobInternal(job);
- String jobId = running.getID().toString();
- LOG.info("Running job: " + jobId);
+ BSPJobID jobId = running.getID();
+ LOG.info("Running job: " + jobId.toString());
while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+
if (running.isComplete()) {
- LOG.info("Job complete: " + jobId);
break;
}
+
+ running = jc.getJob(jobId);
}
+ LOG.info("Job complete: " + jobId);
+ LOG.info("The total number of supersteps: " + running.getSuperstepCount());
+
// TODO if error found, kill job
// running.killJob();
jc.close();
}
/**
+ * Get an RunningJob object to track an ongoing job. Returns null if the id
+ * does not correspond to any known job.
+ *
+ * @throws IOException
+ */
+ private RunningJob getJob(BSPJobID jobId) throws IOException {
+ JobStatus status = jobSubmitClient.getJobStatus(jobId);
+ if (status != null) {
+ return new NetworkedJob(status);
+ } else {
+ return null;
+ }
+ }
+
+ /**
* Get status information about the BSP cluster
*
* @param detailed if true then get a detailed status including the
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Nov 9 06:43:08 2010
@@ -61,6 +61,7 @@ public class BSPPeer implements Watcher,
protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
protected Set<String> allPeerNames = new HashSet<String>();
protected InetSocketAddress peerAddress;
+ protected TaskStatus currentTaskStatus;
/**
*
@@ -68,8 +69,10 @@ public class BSPPeer implements Watcher,
public BSPPeer(Configuration conf) throws IOException {
this.conf = conf;
- String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
- int bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ String bindAddress = conf.get(Constants.PEER_HOST,
+ Constants.DEFAULT_PEER_HOST);
+ int bindPort = conf
+ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
Constants.DEFAULT_ZOOKEEPER_ROOT);
zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM)
@@ -84,7 +87,8 @@ public class BSPPeer implements Watcher,
public void reinitialize() {
try {
LOG.debug("reinitialize(): " + getPeerName());
- server = RPC.getServer(this, peerAddress.getHostName(), peerAddress.getPort(), conf);
+ server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
+ .getPort(), conf);
server.start();
} catch (IOException e) {
e.printStackTrace();
@@ -129,8 +133,7 @@ public class BSPPeer implements Watcher,
* org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
*/
@Override
- public void send(String peerName, BSPMessage msg)
- throws IOException {
+ public void send(String peerName, BSPMessage msg) throws IOException {
LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(peerName);
if (queue == null) {
@@ -170,7 +173,7 @@ public class BSPPeer implements Watcher,
}
}
- // Should we clearing outgoingQueues?
+ // Clear outgoing queues.
this.outgoingQueues.clear();
enterBarrier();
@@ -180,13 +183,14 @@ public class BSPPeer implements Watcher,
// the number of peers, and the load of zookeeper.
// It should fixed to some flawless way.
leaveBarrier();
+ currentTaskStatus.incrementSuperstepCount();
}
protected boolean enterBarrier() throws KeeperException, InterruptedException {
LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
try {
- zk.create(bspRoot + "/" + getPeerName(), new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL);
+ zk.create(bspRoot + "/" + getPeerName(), new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
@@ -265,7 +269,7 @@ public class BSPPeer implements Watcher,
}
/**
- * @return the string as host:port of this Peer
+ * @return the string as host:port of this Peer
*/
public String getPeerName() {
return peerAddress.getHostName() + ":" + peerAddress.getPort();
@@ -292,9 +296,26 @@ public class BSPPeer implements Watcher,
this.allPeerNames = new HashSet<String>(allPeerNames);
}
- @Override
+ /**
+ * @return the number of messages
+ */
public int getNumCurrentMessages() {
return localQueue.size();
}
+ /**
+ * Sets the current status
+ *
+ * @param currentTaskStatus
+ */
+ public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+ this.currentTaskStatus = currentTaskStatus;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount() {
+ return currentTaskStatus.getSuperstepCount();
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Nov 9 06:43:08 2010
@@ -60,6 +60,11 @@ public interface BSPPeerInterface extend
public void sync() throws IOException, KeeperException, InterruptedException;
/**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount();
+
+ /**
* @return The name of this peer in the format "hostname:port".
*/
public String getPeerName();
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Nov 9 06:43:08 2010
@@ -534,6 +534,7 @@ public class GroomServer implements Runn
public void launchTask() throws IOException {
taskStatus.setRunState(TaskStatus.State.RUNNING);
+ bspPeer.setCurrentTaskStatus(taskStatus);
this.runner = task.createRunner(bspPeer, this.jobConf);
this.runner.start();
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Tue Nov 9 06:43:08 2010
@@ -28,11 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-/*************************************************************
+/**
* JobInProgress maintains all the info for keeping a Job on the straight and
* narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
- * tables for doing bookkeeping of its Tasks.
- * ***********************************************************
+ * tables for doing bookkeeping of its Tasks.ss
*/
class JobInProgress {
/**
@@ -64,6 +63,7 @@ class JobInProgress {
private BSPJobID jobId;
final BSPMaster master;
List<TaskInProgress> tasks;
+ private long superstepCounter;
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
@@ -77,6 +77,7 @@ class JobInProgress {
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
this.startTime = System.currentTimeMillis();
status.setStartTime(startTime);
+ this.superstepCounter = 0;
this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ ".xml");
@@ -96,7 +97,7 @@ class JobInProgress {
if (jarFile != null) {
fs.copyToLocalFile(new Path(jarFile), localJarFile);
}
-
+
}
// ///////////////////////////////////////////////////
@@ -141,7 +142,7 @@ class JobInProgress {
public synchronized Task obtainNewTask(GroomServerStatus status,
int clusterSize, int numUniqueHosts) {
LOG.debug("clusterSize: " + clusterSize);
-
+
Task result = null;
try {
TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile
@@ -175,18 +176,24 @@ class JobInProgress {
this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
JobStatus.RUNNING);
-
- if(allDone) {
+
+ if (allDone) {
LOG.debug("Job successfully done.");
-
- this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
- JobStatus.SUCCEEDED);
- garbageCollect();
+
+ this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f, 1.0f,
+ JobStatus.SUCCEEDED, superstepCounter);
+ garbageCollect();
}
}
- public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
- tip.updateStatus(status); // update tip
+ public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus taskStatus) {
+ tip.updateStatus(taskStatus); // update tip
+
+ if (superstepCounter < taskStatus.getSuperstepCount()) {
+ superstepCounter = taskStatus.getSuperstepCount();
+ // TODO Later, we have to update JobInProgress status here
+
+ }
}
public void kill() {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Tue Nov 9 06:43:08 2010
@@ -51,7 +51,8 @@ public class JobStatus implements Writab
private long startTime;
private String schedulingInfo = "NA";
private String user;
-
+ private long superstepCount;
+
public JobStatus() {
}
@@ -66,13 +67,19 @@ public class JobStatus implements Writab
public JobStatus(BSPJobID jobid, float setupProgress, float progress,
float cleanupProgress, int runState) {
+ this(jobid, 0.0f, progress, cleanupProgress, runState, 0);
+ }
+
+ public JobStatus(BSPJobID jobid, float setupProgress, float progress,
+ float cleanupProgress, int runState, long superstepCount) {
this.jobid = jobid;
this.setupProgress = setupProgress;
this.progress = progress;
this.cleanupProgress = cleanupProgress;
this.runState = runState;
+ this.superstepCount = superstepCount;
}
-
+
public BSPJobID getJobID() {
return jobid;
}
@@ -109,6 +116,10 @@ public class JobStatus implements Writab
this.runState = state;
}
+ public synchronized long getSuperstepCount() {
+ return superstepCount;
+ }
+
synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
@@ -160,6 +171,7 @@ public class JobStatus implements Writab
out.writeInt(runState);
out.writeLong(startTime);
Text.writeString(out, schedulingInfo);
+ out.writeLong(superstepCount);
}
public synchronized void readFields(DataInput in) throws IOException {
@@ -171,5 +183,7 @@ public class JobStatus implements Writab
this.runState = in.readInt();
this.startTime = in.readLong();
this.schedulingInfo = Text.readString(in);
+ this.superstepCount = in.readLong();
}
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java Tue Nov 9 06:43:08 2010
@@ -111,4 +111,6 @@ public interface RunningJob {
*/
public void killTask(TaskAttemptID taskId, boolean shouldFail)
throws IOException;
+
+ public long getSuperstepCount() throws IOException;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue Nov 9 06:43:08 2010
@@ -194,4 +194,8 @@ class TaskInProgress {
public void updateStatus(TaskStatus status) {
taskStatuses.put(status.getTaskId(), status);
}
+
+ public TaskStatus getTaskStatus(String taskId) {
+ return this.taskStatuses.get(taskId);
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Tue Nov 9 06:43:08 2010
@@ -45,7 +45,8 @@ class TaskStatus implements Writable, Cl
private volatile State runState;
private String stateString;
private String groomServer;
-
+ private long superstepCount;
+
private long startTime;
private long finishTime;
@@ -56,6 +57,7 @@ class TaskStatus implements Writable, Cl
*/
public TaskStatus() {
taskId = new String();
+ this.superstepCount = 0;
}
public TaskStatus(String taskId, float progress, State runState,
@@ -66,6 +68,7 @@ class TaskStatus implements Writable, Cl
this.stateString = stateString;
this.groomServer = groomServer;
this.phase = phase;
+ this.superstepCount = 0;
}
// //////////////////////////////////////////////////
@@ -208,6 +211,20 @@ class TaskStatus implements Writable, Cl
this.finishTime = finishTime;
}
}
+
+ /**
+ * @return The number of BSP super steps executed by the task.
+ */
+ public long getSuperstepCount() {
+ return superstepCount;
+ }
+
+ /**
+ * Increments the number of BSP super steps executed by the task.
+ */
+ public void incrementSuperstepCount() {
+ superstepCount += 1;
+ }
@Override
public Object clone() {
@@ -232,6 +249,7 @@ class TaskStatus implements Writable, Cl
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
+ this.superstepCount = in.readLong();
}
@Override
@@ -243,5 +261,6 @@ class TaskStatus implements Writable, Cl
WritableUtils.writeEnum(out, phase);
out.writeLong(startTime);
out.writeLong(finishTime);
+ out.writeLong(superstepCount);
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Nov 9 06:43:08 2010
@@ -27,7 +27,6 @@ import java.util.Random;
import java.util.Set;
import junit.framework.AssertionFailedError;
-
import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner;
import net.sourceforge.groboutils.junit.v1.TestRunnable;
@@ -97,6 +96,9 @@ public class TestBSPPeer extends HamaClu
peerNames.add("localhost:" + (30000 + i));
}
peer.setAllPeerNames(peerNames);
+ TaskStatus currentTaskStatus = new TaskStatus("localhost:"
+ + lastTwoDigitsOfPort, 0, null, null, null, null);
+ peer.setCurrentTaskStatus(currentTaskStatus);
}
@Override
@@ -135,14 +137,16 @@ public class TestBSPPeer extends HamaClu
e.printStackTrace();
}
- verifyPayload();
+ verifyPayload(i);
}
}
- private void verifyPayload() {
+ private void verifyPayload(int round) {
int numMessages = peer.getNumCurrentMessages();
+ assertEquals(round, ((int) peer.getSuperstepCount() -1 ));
+
LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages
- + " messages");
+ + " messages at " + round + " round");
if (lastTwoDigitsOfPort < 10) {
assertEquals(20, numMessages);
@@ -169,7 +173,7 @@ public class TestBSPPeer extends HamaClu
}
}
- public void testSync() throws Throwable {
+ public void testSync() throws Throwable {
conf.setInt("bsp.peers.num", NUM_PEER);
conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");