You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/11/10 02:29:33 UTC
svn commit: r1033317 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Wed Nov 10 01:29:33 2010
New Revision: 1033317
URL: http://svn.apache.org/viewvc?rev=1033317&view=rev
Log:
Allows user to set the number of tasks
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Nov 10 01:29:33 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-321: Allows user to set the number of tasks (edwardyoon)
HAMA-282: Add superstep counter (Filipe Manana via edwardyoon)
HAMA-327: Add overview for javadoc (edwardyoon)
HAMA-325: Add package.html to each package and few comments (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Nov 10 01:29:33 2010
@@ -94,14 +94,13 @@ public class PiEstimator {
IOException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
- // Execute locally
- // conf.set("bsp.master.address", "local");
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
bsp.setBspClass(MyEstimator.class);
-
+ bsp.setNumBspTask(10);
+
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
// Choose one as a master
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Wed Nov 10 01:29:33 2010
@@ -70,14 +70,13 @@ public class SerializePrinting {
IOException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
- // Execute locally
- // conf.set("bsp.master.address", "local");
BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
// Set the job name
bsp.setJobName("serialize printing");
bsp.setBspClass(HelloBSP.class);
-
+ bsp.setNumBspTask(10);
+
BSPJobClient.runJob(bsp);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Wed Nov 10 01:29:33 2010
@@ -38,20 +38,34 @@ public class BSPJob extends BSPJobContex
this(new HamaConfiguration());
}
- public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
- this(conf);
- setJobName(jobName);
- }
-
public BSPJob(HamaConfiguration conf) throws IOException {
super(conf, null);
jobClient = new BSPJobClient(conf);
}
+
+ public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
+ this(conf);
+ setJobName(jobName);
+ }
public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
super(new Path(jobFile), jobID);
}
+ /**
+ * Only for unit test
+ *
+ * TODO will be deleted when miniBSPCluster implemented
+ *
+ * @param conf
+ * @param tasks
+ * @throws IOException
+ */
+ public BSPJob(HamaConfiguration conf, int tasks) throws IOException {
+ super(conf, null);
+ setNumBspTask(tasks);
+ }
+
@SuppressWarnings("unchecked")
public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
this(conf);
@@ -200,4 +214,7 @@ public class BSPJob extends BSPJobContex
conf.setInt("bsp.peers.num", tasks);
}
+ public int getNumBspTask() {
+ return conf.getInt("bsp.peers.num", 1);
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Wed Nov 10 01:29:33 2010
@@ -134,7 +134,7 @@ public class BSPJobClient extends Config
ensureFreshStatus();
return status.getSuperstepCount();
}
-
+
/**
* Blocks until the job is finished
*/
@@ -389,6 +389,15 @@ public class BSPJobClient extends Config
public static void runJob(BSPJob job) throws FileNotFoundException,
IOException {
BSPJobClient jc = new BSPJobClient(job.getConf());
+
+ // TODO this code must be removed
+ // when GroomServer supports the multiple tasks.
+ if (job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) {
+ // If the number of tasks is greater than the number of GroomServer,
+ // reset the number of tasks as number of GroomServer.
+ job.setNumBspTask(jc.getClusterStatus(false).getGroomServers());
+ }
+
RunningJob running = jc.submitJobInternal(job);
BSPJobID jobId = running.getID();
LOG.info("Running job: " + jobId.toString());
@@ -408,7 +417,7 @@ public class BSPJobClient extends Config
LOG.info("Job complete: " + jobId);
LOG.info("The total number of supersteps: " + running.getSuperstepCount());
-
+
// TODO if error found, kill job
// running.killJob();
jc.close();
@@ -418,7 +427,7 @@ public class BSPJobClient extends Config
* Get an RunningJob object to track an ongoing job. Returns null if the id
* does not correspond to any known job.
*
- * @throws IOException
+ * @throws IOException
*/
private RunningJob getJob(BSPJobID jobId) throws IOException {
JobStatus status = jobSubmitClient.getJobStatus(jobId);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Nov 10 01:29:33 2010
@@ -46,25 +46,25 @@ import org.apache.zookeeper.data.Stat;
public class BSPPeer implements Watcher, BSPPeerInterface {
public static final Log LOG = LogFactory.getLog(BSPPeer.class);
- protected Configuration conf;
+ private Configuration conf;
+ private BSPJob jobConf;
- protected InetSocketAddress masterAddr = null;
- protected Server server = null;
- protected ZooKeeper zk = null;
- protected volatile Integer mutex = 0;
-
- protected final String bspRoot;
- protected final String zookeeperAddr;
-
- protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
- protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
- protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
- protected Set<String> allPeerNames = new HashSet<String>();
- protected InetSocketAddress peerAddress;
- protected TaskStatus currentTaskStatus;
+ private Server server = null;
+ private ZooKeeper zk = null;
+ private volatile Integer mutex = 0;
+
+ private final String bspRoot;
+ private final String zookeeperAddr;
+
+ private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+ private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+ private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ private Set<String> allPeerNames = new HashSet<String>();
+ private InetSocketAddress peerAddress;
+ private TaskStatus currentTaskStatus;
/**
- *
+ * Constructor
*/
public BSPPeer(Configuration conf) throws IOException {
this.conf = conf;
@@ -174,7 +174,7 @@ public class BSPPeer implements Watcher,
}
// Clear outgoing queues.
- this.outgoingQueues.clear();
+ clearOutgoingQueues();
enterBarrier();
Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
@@ -201,8 +201,7 @@ public class BSPPeer implements Watcher,
synchronized (mutex) {
List<String> list = zk.getChildren(bspRoot, true);
- // TODO it must be same with the number of slave nodes, at this time.
- if (list.size() < conf.getInt("bsp.peers.num", 0)) {
+ if (list.size() < jobConf.getNumBspTask()) {
mutex.wait();
} else {
return true;
@@ -318,4 +317,41 @@ public class BSPPeer implements Watcher,
public long getSuperstepCount() {
return currentTaskStatus.getSuperstepCount();
}
+
+ /**
+ * Sets the job configuration
+ *
+ * @param jobConf
+ */
+ public void setJobConf(BSPJob jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ /**
+ * @return the size of local queue
+ */
+ public int getLocalQueueSize() {
+ return localQueue.size();
+ }
+
+ /**
+ * @return the size of outgoing queue
+ */
+ public int getOutgoingQueueSize() {
+ return outgoingQueues.size();
+ }
+
+ /**
+ * Clears local queue
+ */
+ public void clearLocalQueue() {
+ this.localQueue.clear();
+ }
+
+ /**
+ * Clears outgoing queues
+ */
+ public void clearOutgoingQueues() {
+ this.outgoingQueues.clear();
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Nov 10 01:29:33 2010
@@ -534,6 +534,7 @@ public class GroomServer implements Runn
public void launchTask() throws IOException {
taskStatus.setRunState(TaskStatus.State.RUNNING);
+ bspPeer.setJobConf(jobConf);
bspPeer.setCurrentTaskStatus(taskStatus);
this.runner = task.createRunner(bspPeer, this.jobConf);
this.runner.start();
@@ -546,8 +547,8 @@ public class GroomServer implements Runn
e.printStackTrace();
}
- if (bspPeer.localQueue.size() == 0
- && bspPeer.outgoingQueues.size() == 0
+ if (bspPeer.getLocalQueueSize() == 0
+ && bspPeer.getOutgoingQueueSize() == 0
&& !runner.isAlive()) {
taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
acceptNewTasks = true;
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Wed Nov 10 01:29:33 2010
@@ -32,9 +32,9 @@ import net.sourceforge.groboutils.junit.
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -51,7 +51,7 @@ public class TestBSPPeer extends HamaClu
private static final int ROUND = 3;
private static final int PAYLOAD = 1024; // 1kb in default
List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
- Configuration conf;
+ HamaConfiguration conf;
private Random r = new Random();
public TestBSPPeer() {
@@ -88,7 +88,7 @@ public class TestBSPPeer extends HamaClu
private int MAXIMUM_DURATION = 5;
private int lastTwoDigitsOfPort;
- public BSPPeerThread(Configuration conf) throws IOException {
+ public BSPPeerThread(HamaConfiguration conf) throws IOException {
lastTwoDigitsOfPort = conf.getInt(Constants.PEER_PORT, 0) - 30000;
this.peer = new BSPPeer(conf);
Set<String> peerNames = new HashSet<String>(NUM_PEER);
@@ -99,6 +99,8 @@ public class TestBSPPeer extends HamaClu
TaskStatus currentTaskStatus = new TaskStatus("localhost:"
+ lastTwoDigitsOfPort, 0, null, null, null, null);
peer.setCurrentTaskStatus(currentTaskStatus);
+ BSPJob jobConf = new BSPJob(conf, NUM_PEER);
+ peer.setJobConf((BSPJob) jobConf);
}
@Override
@@ -165,7 +167,7 @@ public class TestBSPPeer extends HamaClu
LOG.error(e);
}
- peer.localQueue.clear();
+ peer.clearLocalQueue();
}
public BSPPeer getBSPPeer() {
@@ -174,12 +176,12 @@ public class TestBSPPeer extends HamaClu
}
public void testSync() throws Throwable {
-
+
conf.setInt("bsp.peers.num", NUM_PEER);
conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
conf.set(Constants.PEER_HOST, "localhost");
conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
-
+
TestRunnable[] threads = new TestRunnable[NUM_PEER];
for (int i = 0; i < NUM_PEER; i++) {