You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2012/01/26 09:43:57 UTC

svn commit: r1236074 - in /incubator/hama/trunk/src/site: resources/css/site.css xdoc/hama_bsp_tutorial.xml

Author: edwardyoon
Date: Thu Jan 26 08:43:57 2012
New Revision: 1236074

Add tutorial page.


Modified: incubator/hama/trunk/src/site/resources/css/site.css
--- incubator/hama/trunk/src/site/resources/css/site.css (original)
+++ incubator/hama/trunk/src/site/resources/css/site.css Thu Jan 26 08:43:57 2012
@@ -30,10 +30,20 @@ body a:hover {
   color: #888800;
-h2, h3, h4 {
+h2, h3, h4, h5 {
   color: #74240f;
+h5 {
+  font-size: 15px;
+  margin-top: -0.8em;
+table {
+  width: 95%;
+  margin-left:2em;
 ul {

Modified: incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
--- incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original)
+++ incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Thu Jan 26 08:43:57 2012
@@ -23,8 +23,279 @@ xsi:schemaLocation="http://maven.apache.
     <section name="Hama BSP Tutorial"></section>
     <p>This document describes the Hama BSP framework and serves as a tutorial.</p>
     <subsection name="Overview"></subsection>
-    <subsection name="Inputs and Outputs"></subsection>
-    <subsection name="BSP user interfaces"></subsection>
-    <subsection name="Example: Pi calculation"></subsection>
+    <p>Hama provides a Pure BSP Bulk Synchronous Parallel Model for message passing and collective communication.
+    A BSP program consists of a sequence of supersteps. Each superstep consists of the following three phases:</p>
+    <ul>
+     <li>Local computation</li>
+     <li>Process communication</li>
+     <li>Barrier synchronization</li>
+    </ul>
+    <p>BSP programming enables you to write high-performance parallel computing algorithms for a wide range of scientific problems.</p>
+    <subsection name="Create your own BSP by extending BSP class"></subsection>
+    <p>The way to create your own BSP class is to create a class that extends the org.apache.hama.bsp.<b>BSP</b> class.
+    <br/>
+    The extending class must override the bsp() method, which is declared like this:
+    </p>
+    <pre class="green">
+  public abstract void bsp(BSPPeer&lt;K1, V1, K2, V2&gt; peer) throws IOException, 
+    SyncException, InterruptedException;</pre>
+    <p>
+    You will define the BSP program inside this bsp() method.
+    It is important to understand that it doesn't mean a single superstep. 
+    As described above, a BSP program consists of a sequence of supersteps. 
+    So it just gets called once, not all over again unlike Mapper or Reducer method.
+    <br/><br/>
+    <b>NOTE</b>: Optionally, there are also setup() and cleanup() which will be called at the beginning of your computation, 
+    respectively at the end of the computation. cleanup() is guranteed to run after the computation or in case of failure. 
+    You can simply override the methods you need from BSP class.
+    <br/><br/>
+    After your own BSP is created, you will need to configure a <b>BSPJob</b> and submit it to Hama cluster to execute a job.
+    The BSP job configuration and submission interfaces is almost the same as the MapReduce job configuration:</p>
+    <pre class="green">
+  HamaConfiguration conf = new HamaConfiguration();
+  BSPJob job = new BSPJob(conf, MyBSP.class);
+  job.setJobName("My BSP program");
+  job.setBspClass(MyBSP.class);
+  job.setInputFormat(NullInputFormat.class);
+  job.setOutputKeyClass(Text.class);
+  ...
+  job.waitForCompletion(true);</pre>
+    <p>See the below section for more detailed description of BSP user interfaces.</p>
+    <subsection name="User Interfaces"></subsection>
+    <h5>Inputs and Outputs</h5>
+    <p>When setting up a BSPJob, you can provide a Input/OutputFormat and Paths like this:</p>
+    <pre class="green">
+  job.setInputPath(new Path("/tmp/sequence.dat");
+  job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+  or,
+  SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat"));
+  or,
+  SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat");
+  job.setOutputKeyClass(Text.class);
+  job.setOutputValueClass(IntWritable.class);
+  job.setOutputFormat(TextOutputFormat.class);
+  FileOutputFormat.setOutputPath(job, new Path("/tmp/result"));
+  </pre>
+    <p>Then, you can read the input and write the output from the methods in BSP class which has "BSPPeer" which contains an communication, counters, and IO interfaces as parameter.
+    In this case we read a normal text file:</p>
+    <pre class="green">
+ @Override
+  public final void bsp(
+      BSPPeer&lt;LongWritable, Text, Text, LongWritable&gt; peer)
+      throws IOException, InterruptedException, SyncException {
+      // this method reads the next key value record from file
+      KeyValuePair&lt;LongWritable, Text&gt; pair = peer.readNext();
+      // the following lines do the same:
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      peer.readNext(key, value);
+      // write
+      peer.write(value, key);
+  }</pre>
+    <p>Consult the docs for more detail on events like end of file.
+    There is also a function which allows you to re-read the input from the beginning.
+    This snippet reads the input five times:
+    </p>
+    <pre class="green">
+  for(int i = 0; i &lt; 5; i++){
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    while (peer.readNext(key, value)) {
+       // read everything
+    }
+    // reopens the input
+    peer.reopenInput()
+  }</pre>
+    <h5>Communication</h5>
+    <p>Hama BSP provides simple but powerful communication APIs for many purposes. 
+    We tried to follow the standard library of BSP world as much as possible. 
+    The following table describes all the methods you can use:</p>
+<table align="center" border="0">
+<tr><td>send(String peerName, BSPMessage msg)</td><td>Sends a message to another peer.</td></tr>
+<tr><td>getCurrentMessage()</td><td>Returns a received message.</td></tr>
+<tr><td>getNumCurrentMessages()</td><td>Returns the number of received messages.</td></tr>
+<tr><td>sync()</td><td>Barrier synchronization.</td></tr>
+<tr><td>getPeerName()</td><td>Returns a peer’s hostname.</td></tr>
+<tr><td>getAllPeerNames()</td><td>Returns all peer’s hostname.</td></tr>
+<tr><td>getSuperstepCount()</td><td>Returns the count of supersteps</td></tr>
+    <p>The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:</p>
+    <pre class="green">
+  @Override
+  public void bsp(
+      BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; peer)
+      throws IOException, SyncException, InterruptedException {
+    for (String peerName : peer.getAllPeerNames()) {
+      peer.send(peerName, 
+        new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
+    }
+    peer.sync();
+  }</pre>
+    <h5>Synchronization</h5>
+    <p>When all the processes have entered the barrier via the sync() method, 
+    the Hama proceeds to the next superstep. 
+    In the previous example, the BSP job will be finished by one synchronization 
+    after sending a message "Hello from ..." to all peers.
+    <br/><br/>
+    But, keep in mind that the sync() function is not the end of the BSP job. 
+    As was previously mentioned, all the communication functions are very flexible. 
+    For example, the sync() method also can be called in a for loop 
+    so that you can use to program the iterative methods sequentially:</p>
+    <pre class="green">
+  @Override
+  public void bsp(
+      BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; peer)
+      throws IOException, SyncException, InterruptedException {
+    for (int i = 0; i &lt; 100; i++) {
+      // send some messages
+      peer.sync();
+    }
+  }</pre>
+    <subsection name="Shell Command Line Interfaces"></subsection>
+    <p>Hama provides several command for BSP job administration:</p>
+<table align="center" border="0">
+<tr><td>-submit &lt;job-file&gt;</td><td>Submits the job.</td></tr>
+<tr><td>-status &lt;job-id&gt;</td><td>Prints the job status.</td></tr>
+<tr><td>-kill &lt;job-id&gt;</td><td>Kills the job.</td></tr>
+<tr><td>-list [all]</td><td>-list all displays all jobs. -list displays only jobs which are yet to be completed.</td></tr>
+<tr><td>-list-active-grooms</td><td>Displays the list of active groom server in the cluster.</td></tr>
+<tr><td>-list-attempt-ids &lt;jobId&gt; &lt;task-state&gt;</td><td>Displays the list of tasks for a given job currently in a particular state (running or completed).</td></tr>
+<tr><td>-kill-task &lt;task-id&gt;</td><td>Kills the task. Killed tasks are NOT counted against failed attempts.</td></tr>
+<tr><td>-fail-task &lt;task-id&gt;</td><td>Fails the task. Failed tasks are counted against failed attempts.</td></tr>
+    <subsection name="Example: Pi Calculation"></subsection>
+    <p>Here is an BSP-based Pi Calculation example and submit it to Hama cluster:</p>
+    <pre class="green">
+public class PiEstimator {
+  private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());
+  public static class MyEstimator extends
+      BSP&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; {
+    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
+    private String masterTask;
+    private static final int iterations = 10000;
+    @Override
+    public void bsp(
+        BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; peer)
+        throws IOException, SyncException, InterruptedException {
+      int in = 0, out = 0;
+      for (int i = 0; i &lt; iterations; i++) {
+        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+        if ((Math.sqrt(x * x + y * y) &lt; 1.0)) {
+          in++;
+        } else {
+          out++;
+        }
+      }
+      double data = 4.0 * (double) in / (double) iterations;
+      DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
+      peer.send(masterTask, estimate);
+      peer.sync();
+    }
+    @Override
+    public void setup(
+        BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; peer)
+        throws IOException {
+      // Choose one as a master
+      this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
+    }
+    public void cleanup(
+        BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable&gt; peer)
+        throws IOException {
+      if (peer.getPeerName().equals(masterTask)) {
+        double pi = 0.0;
+        int numPeers = peer.getNumCurrentMessages();
+        DoubleMessage received;
+        while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
+          pi += received.getData();
+        }
+        pi = pi / numPeers;
+        peer
+            .write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
+      }
+    }
+  }
+  static void printOutput(HamaConfiguration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+    for (int i = 0; i &lt; files.length; i++) {
+      if (files[i].getLen() &gt; 0) {
+        FSDataInputStream in =[i].getPath());
+        IOUtils.copyBytes(in, System.out, conf, false);
+        in.close();
+        break;
+      }
+    }
+    fs.delete(TMP_OUTPUT, true);
+  }
+  public static void main(String[] args) throws InterruptedException,
+      IOException, ClassNotFoundException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+    BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+    // Set the job name
+    bsp.setJobName("Pi Estimation Example");
+    bsp.setBspClass(MyEstimator.class);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(DoubleWritable.class);
+    bsp.setOutputFormat(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+    if (args.length &gt; 0) {
+      bsp.setNumBspTask(Integer.parseInt(args[0]));
+    } else {
+      // Set to maximum
+      bsp.setNumBspTask(cluster.getMaxTasks());
+    }
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      printOutput(conf);
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
+  }