You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/01/26 09:43:57 UTC
svn commit: r1236074 - in /incubator/hama/trunk/src/site:
resources/css/site.css xdoc/hama_bsp_tutorial.xml
Author: edwardyoon
Date: Thu Jan 26 08:43:57 2012
New Revision: 1236074
URL: http://svn.apache.org/viewvc?rev=1236074&view=rev
Log:
Add tutorial page.
Modified:
incubator/hama/trunk/src/site/resources/css/site.css
incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
Modified: incubator/hama/trunk/src/site/resources/css/site.css
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/site/resources/css/site.css?rev=1236074&r1=1236073&r2=1236074&view=diff
==============================================================================
--- incubator/hama/trunk/src/site/resources/css/site.css (original)
+++ incubator/hama/trunk/src/site/resources/css/site.css Thu Jan 26 08:43:57 2012
@@ -30,10 +30,20 @@ body a:hover {
color: #888800;
}
-h2, h3, h4 {
+h2, h3, h4, h5 {
color: #74240f;
}
+h5 {
+ font-size: 15px;
+ margin-top: -0.8em;
+}
+
+table {
+ width: 95%;
+ margin-left:2em;
+}
+
ul {
margin-left:10px;
}
Modified: incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml?rev=1236074&r1=1236073&r2=1236074&view=diff
==============================================================================
--- incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original)
+++ incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Thu Jan 26 08:43:57 2012
@@ -23,8 +23,279 @@ xsi:schemaLocation="http://maven.apache.
<section name="Hama BSP Tutorial"></section>
<p>This document describes the Hama BSP framework and serves as a tutorial.</p>
<subsection name="Overview"></subsection>
- <subsection name="Inputs and Outputs"></subsection>
- <subsection name="BSP user interfaces"></subsection>
- <subsection name="Example: Pi calculation"></subsection>
+ <p>Hama provides a Pure BSP Bulk Synchronous Parallel Model for message passing and collective communication.
+ A BSP program consists of a sequence of supersteps. Each superstep consists of the following three phases:</p>
+
+ <ul>
+ <li>Local computation</li>
+ <li>Process communication</li>
+ <li>Barrier synchronization</li>
+ </ul>
+ <p>BSP programming enables you to write high-performance parallel computing algorithms for a wide range of scientific problems.</p>
+
+ <subsection name="Create your own BSP by extending BSP class"></subsection>
+
+ <p>The way to create your own BSP class is to create a class that extends the org.apache.hama.bsp.<b>BSP</b> class.
+ <br/>
+ The extending class must override the bsp() method, which is declared like this:
+ </p>
+ <pre class="green">
+ public abstract void bsp(BSPPeer<K1, V1, K2, V2> peer) throws IOException,
+ SyncException, InterruptedException;</pre>
+
+ <p>
+ You will define the BSP program inside this bsp() method.
+ It is important to understand that it doesn't mean a single superstep.
+ As described above, a BSP program consists of a sequence of supersteps.
+ So it just gets called once, not all over again unlike Mapper or Reducer method.
+ <br/><br/>
+ <b>NOTE</b>: Optionally, there are also setup() and cleanup() which will be called at the beginning of your computation,
+ respectively at the end of the computation. cleanup() is guranteed to run after the computation or in case of failure.
+ You can simply override the methods you need from BSP class.
+ <br/><br/>
+ After your own BSP is created, you will need to configure a <b>BSPJob</b> and submit it to Hama cluster to execute a job.
+ The BSP job configuration and submission interfaces is almost the same as the MapReduce job configuration:</p>
+ <pre class="green">
+ HamaConfiguration conf = new HamaConfiguration();
+ BSPJob job = new BSPJob(conf, MyBSP.class);
+ job.setJobName("My BSP program");
+ job.setBspClass(MyBSP.class);
+ job.setInputFormat(NullInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ ...
+ job.waitForCompletion(true);</pre>
+ <p>See the below section for more detailed description of BSP user interfaces.</p>
+
+ <subsection name="User Interfaces"></subsection>
+ <h5>Inputs and Outputs</h5>
+ <p>When setting up a BSPJob, you can provide a Input/OutputFormat and Paths like this:</p>
+ <pre class="green">
+ job.setInputPath(new Path("/tmp/sequence.dat");
+ job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+ or,
+ SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat"));
+ or,
+ SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path("/tmp/result"));
+ </pre>
+
+ <p>Then, you can read the input and write the output from the methods in BSP class which has "BSPPeer" which contains an communication, counters, and IO interfaces as parameter.
+ In this case we read a normal text file:</p>
+
+ <pre class="green">
+ @Override
+ public final void bsp(
+ BSPPeer<LongWritable, Text, Text, LongWritable> peer)
+ throws IOException, InterruptedException, SyncException {
+
+ // this method reads the next key value record from file
+ KeyValuePair<LongWritable, Text> pair = peer.readNext();
+
+ // the following lines do the same:
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ peer.readNext(key, value);
+
+ // write
+ peer.write(value, key);
+ }</pre>
+ <p>Consult the docs for more detail on events like end of file.
+ There is also a function which allows you to re-read the input from the beginning.
+ This snippet reads the input five times:
+ </p>
+ <pre class="green">
+ for(int i = 0; i < 5; i++){
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ while (peer.readNext(key, value)) {
+ // read everything
+ }
+ // reopens the input
+ peer.reopenInput()
+ }</pre>
+
+ <h5>Communication</h5>
+ <p>Hama BSP provides simple but powerful communication APIs for many purposes.
+ We tried to follow the standard library of BSP world as much as possible.
+ The following table describes all the methods you can use:</p>
+
+<table align="center" border="0">
+<tr><td><b>Method</b></td><td><b>Description</b></td></tr>
+<tr><td>send(String peerName, BSPMessage msg)</td><td>Sends a message to another peer.</td></tr>
+<tr><td>getCurrentMessage()</td><td>Returns a received message.</td></tr>
+<tr><td>getNumCurrentMessages()</td><td>Returns the number of received messages.</td></tr>
+<tr><td>sync()</td><td>Barrier synchronization.</td></tr>
+<tr><td>getPeerName()</td><td>Returns a peerâs hostname.</td></tr>
+<tr><td>getAllPeerNames()</td><td>Returns all peerâs hostname.</td></tr>
+<tr><td>getSuperstepCount()</td><td>Returns the count of supersteps</td></tr>
+</table>
+
+ <p>The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:</p>
+
+ <pre class="green">
+ @Override
+ public void bsp(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName,
+ new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
+ }
+
+ peer.sync();
+ }</pre>
+
+ <h5>Synchronization</h5>
+
+ <p>When all the processes have entered the barrier via the sync() method,
+ the Hama proceeds to the next superstep.
+ In the previous example, the BSP job will be finished by one synchronization
+ after sending a message "Hello from ..." to all peers.
+ <br/><br/>
+ But, keep in mind that the sync() function is not the end of the BSP job.
+ As was previously mentioned, all the communication functions are very flexible.
+ For example, the sync() method also can be called in a for loop
+ so that you can use to program the iterative methods sequentially:</p>
+
+ <pre class="green">
+ @Override
+ public void bsp(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ for (int i = 0; i < 100; i++) {
+ // send some messages
+ peer.sync();
+ }
+ }</pre>
+
+
+ <subsection name="Shell Command Line Interfaces"></subsection>
+ <p>Hama provides several command for BSP job administration:</p>
+
+<table align="center" border="0">
+<tr><td><b>Command</b></td><td><b>Description</b></td></tr>
+<tr><td>-submit <job-file></td><td>Submits the job.</td></tr>
+<tr><td>-status <job-id></td><td>Prints the job status.</td></tr>
+<tr><td>-kill <job-id></td><td>Kills the job.</td></tr>
+<tr><td>-list [all]</td><td>-list all displays all jobs. -list displays only jobs which are yet to be completed.</td></tr>
+<tr><td>-list-active-grooms</td><td>Displays the list of active groom server in the cluster.</td></tr>
+<tr><td>-list-attempt-ids <jobId> <task-state></td><td>Displays the list of tasks for a given job currently in a particular state (running or completed).</td></tr>
+<tr><td>-kill-task <task-id></td><td>Kills the task. Killed tasks are NOT counted against failed attempts.</td></tr>
+<tr><td>-fail-task <task-id></td><td>Fails the task. Failed tasks are counted against failed attempts.</td></tr>
+</table>
+
+ <subsection name="Example: Pi Calculation"></subsection>
+ <p>Here is an BSP-based Pi Calculation example and submit it to Hama cluster:</p>
+ <pre class="green">
+public class PiEstimator {
+ private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());
+
+ public static class MyEstimator extends
+ BSP<NullWritable, NullWritable, Text, DoubleWritable> {
+ public static final Log LOG = LogFactory.getLog(MyEstimator.class);
+ private String masterTask;
+ private static final int iterations = 10000;
+
+ @Override
+ public void bsp(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+
+ int in = 0, out = 0;
+ for (int i = 0; i < iterations; i++) {
+ double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+ if ((Math.sqrt(x * x + y * y) < 1.0)) {
+ in++;
+ } else {
+ out++;
+ }
+ }
+
+ double data = 4.0 * (double) in / (double) iterations;
+ DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
+
+ peer.send(masterTask, estimate);
+ peer.sync();
+ }
+
+ @Override
+ public void setup(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+ throws IOException {
+ // Choose one as a master
+ this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
+ }
+
+ public void cleanup(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+ throws IOException {
+ if (peer.getPeerName().equals(masterTask)) {
+ double pi = 0.0;
+ int numPeers = peer.getNumCurrentMessages();
+ DoubleMessage received;
+ while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
+ pi += received.getData();
+ }
+
+ pi = pi / numPeers;
+ peer
+ .write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
+ }
+ }
+ }
+
+ static void printOutput(HamaConfiguration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].getLen() > 0) {
+ FSDataInputStream in = fs.open(files[i].getPath());
+ IOUtils.copyBytes(in, System.out, conf, false);
+ in.close();
+ break;
+ }
+ }
+
+ fs.delete(TMP_OUTPUT, true);
+ }
+
+ public static void main(String[] args) throws InterruptedException,
+ IOException, ClassNotFoundException {
+ // BSP job configuration
+ HamaConfiguration conf = new HamaConfiguration();
+
+ BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+ // Set the job name
+ bsp.setJobName("Pi Estimation Example");
+ bsp.setBspClass(MyEstimator.class);
+ bsp.setInputFormat(NullInputFormat.class);
+ bsp.setOutputKeyClass(Text.class);
+ bsp.setOutputValueClass(DoubleWritable.class);
+ bsp.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
+
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ ClusterStatus cluster = jobClient.getClusterStatus(true);
+
+ if (args.length > 0) {
+ bsp.setNumBspTask(Integer.parseInt(args[0]));
+ } else {
+ // Set to maximum
+ bsp.setNumBspTask(cluster.getMaxTasks());
+ }
+
+ long startTime = System.currentTimeMillis();
+ if (bsp.waitForCompletion(true)) {
+ printOutput(conf);
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
+ }
+ }
+}</pre>
</body>
</document>