You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/08 14:10:12 UTC

svn commit: r1068367 - in /incubator/hama/trunk: CHANGES.txt src/examples/org/apache/hama/examples/PiEstimator.java src/examples/org/apache/hama/examples/SerializePrinting.java

Author: edwardyoon
Date: Tue Feb  8 13:10:12 2011
New Revision: 1068367

URL: http://svn.apache.org/viewvc?rev=1068367&view=rev
Log:
Improvement of lack of information about the output of examples

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Feb  8 13:10:12 2011
@@ -51,6 +51,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
+    HAMA-351: Improvement of lack of info- about the output of examples (edwardyoon)
     HAMA-348: Remove hard-coded javaOpts (edwardyoon)
     HAMA-347: Add implementation of umbilical interface (edwardyoon)
     HAMA-346: Modify MniCluster so that developers can benefit when testing using Junit

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Feb  8 13:10:12 2011
@@ -22,18 +22,24 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
-import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.KeeperException;
 
 public class PiEstimator {
   private static String MASTER_TASK = "master.task.";
+  private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output");
 
   public static class MyEstimator extends BSP {
     public static final Log LOG = LogFactory.getLog(MyEstimator.class);
@@ -41,8 +47,8 @@ public class PiEstimator {
     private String masterTask;
     private static final int iterations = 10000;
 
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
-        InterruptedException {
+    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+        KeeperException, InterruptedException {
       int in = 0, out = 0;
       for (int i = 0; i < iterations; i++) {
         double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
@@ -64,7 +70,7 @@ public class PiEstimator {
       BSPMessage received;
       while ((received = bspPeer.getCurrentMessage()) != null) {
         LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
-        if(pi == 0.0) {
+        if (pi == 0.0) {
           pi = Bytes.toDouble(received.getData());
         } else {
           pi = (pi + Bytes.toDouble(received.getData())) / 2;
@@ -72,7 +78,13 @@ public class PiEstimator {
       }
 
       if (pi != 0.0) {
-        LOG.info("\nEstimated value of PI is " + pi);
+        FileSystem fileSys = FileSystem.get(conf);
+
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+            TMP_OUTPUT, DoubleWritable.class, DoubleWritable.class,
+            CompressionType.NONE);
+        writer.append(new DoubleWritable(pi), new DoubleWritable(0));
+        writer.close();
       }
     }
 
@@ -96,26 +108,41 @@ public class PiEstimator {
     // Set the job name
     bsp.setJobName("pi estimation example");
     bsp.setBspClass(MyEstimator.class);
-    
+
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);
-    
-    if(args.length > 0) {
+
+    if (args.length > 0) {
       bsp.setNumBspTask(Integer.parseInt(args[0]));
     } else {
       // Set to maximum
       bsp.setNumBspTask(cluster.getGroomServers());
     }
-    
+
     // Choose one as a master
     for (String peerName : cluster.getActiveGroomNames().values()) {
       conf.set(MASTER_TASK, peerName);
       break;
     }
 
+    FileSystem fileSys = FileSystem.get(conf);
+    if (fileSys.exists(TMP_OUTPUT)) {
+      fileSys.delete(TMP_OUTPUT, true);
+    }
+
     long startTime = System.currentTimeMillis();
     BSPJobClient.runJob(bsp);
-    System.out.println("Job Finished in "+
-        (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+    System.out.println("Job Finished in "
+        + (double) (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT,
+        conf);
+    DoubleWritable output = new DoubleWritable();
+    DoubleWritable zero = new DoubleWritable();
+    reader.next(output, zero);
+    reader.close();
+
+    System.out.println("Estimated value of PI is " + output);
   }
 }

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Tue Feb  8 13:10:12 2011
@@ -18,10 +18,17 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
+import java.util.Date;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
@@ -31,23 +38,34 @@ import org.apache.hama.bsp.ClusterStatus
 import org.apache.zookeeper.KeeperException;
 
 public class SerializePrinting {
-  
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+
   public static class HelloBSP extends BSP {
     public static final Log LOG = LogFactory.getLog(HelloBSP.class);
     private Configuration conf;
     private final static int PRINT_INTERVAL = 5000;
 
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
-        InterruptedException {
+    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+        KeeperException, InterruptedException {
       int num = Integer.parseInt(conf.get("bsp.peers.num"));
+      FileSystem fileSys = FileSystem.get(conf);
 
       int i = 0;
       for (String otherPeer : bspPeer.getAllPeerNames()) {
         if (bspPeer.getPeerName().equals(otherPeer)) {
           LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
               + bspPeer.getPeerName());
+
+          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+              CompressionType.NONE);
+          writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+              "Hello BSP from " + (i + 1) + " of " + num + ": "
+                  + bspPeer.getPeerName()));
+          writer.close();
+
         }
-        
+
         Thread.sleep(PRINT_INTERVAL);
         bspPeer.sync();
         i++;
@@ -73,12 +91,27 @@ public class SerializePrinting {
     // Set the job name
     bsp.setJobName("serialize printing");
     bsp.setBspClass(HelloBSP.class);
-    
+
     // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     bsp.setNumBspTask(cluster.getGroomServers());
-    
+
+    FileSystem fileSys = FileSystem.get(conf);
+    if (fileSys.exists(new Path(TMP_OUTPUT))) {
+      fileSys.delete(new Path(TMP_OUTPUT), true);
+    }
     BSPJobClient.runJob(bsp);
+
+    System.out.println("Each task printed the \"Hello World\" as below:");
+    for (int i = 0; i < cluster.getGroomServers(); i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+      System.out.println(new Date(timestamp.get()) + ": " + message);
+      reader.close();
+    }
   }
 }