You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2011/02/08 14:11:59 UTC

[Hama Wiki] Trivial Update of "PiEstimator" by edwardyoon

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "PiEstimator" page has been changed by edwardyoon.
http://wiki.apache.org/hama/PiEstimator?action=diff&rev1=7&rev2=8

--------------------------------------------------

    * One task acts as master and collects the results through the BSP communication interface.
  
  {{{
- public class PiEstimator {
-   private static String MASTER_TASK = "master.task.";
+ public class SerializePrinting {
+   private static String TMP_OUTPUT = "/tmp/test-example/";
  
-   public static class MyEstimator extends BSP {
+   public static class HelloBSP extends BSP {
-     public static final Log LOG = LogFactory.getLog(MyEstimator.class);
+     public static final Log LOG = LogFactory.getLog(HelloBSP.class);
      private Configuration conf;
+     private final static int PRINT_INTERVAL = 5000;
-     private String masterTask;
-     private static final int iterations = 10000;
  
-     @Override
-     public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+     public void bsp(BSPPeerProtocol bspPeer) throws IOException,
-         InterruptedException {
+         KeeperException, InterruptedException {
+       int num = Integer.parseInt(conf.get("bsp.peers.num"));
+       FileSystem fileSys = FileSystem.get(conf);
+ 
-       int in = 0, out = 0;
+       int i = 0;
-       for (int i = 0; i < iterations; i++) {
-         double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
-         if ((Math.sqrt(x * x + y * y) < 1.0)) {
-           in++;
-         } else {
-           out++;
+       for (String otherPeer : bspPeer.getAllPeerNames()) {
+         if (bspPeer.getPeerName().equals(otherPeer)) {
+           LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
+               + bspPeer.getPeerName());
+ 
+           SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+               new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+               CompressionType.NONE);
+           writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+               "Hello BSP from " + (i + 1) + " of " + num + ": "
+                   + bspPeer.getPeerName()));
+           writer.close();
+ 
          }
-       }
  
+         Thread.sleep(PRINT_INTERVAL);
-       byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
-       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
-       BSPMessage estimate = new BSPMessage(tagName, myData);
- 
-       bspPeer.send(masterTask, estimate);
-       bspPeer.sync();
+         bspPeer.sync();
- 
-       double pi = 0.0;
-       BSPMessage received;
-       while ((received = bspPeer.getCurrentMessage()) != null) {
-         LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
-         if(pi == 0.0) {
-           pi = Bytes.toDouble(received.getData());
-         } else {
-           pi = (pi + Bytes.toDouble(received.getData())) / 2;
-         }
+         i++;
-       }
- 
-       if (pi != 0.0) {
-         LOG.info("\nEstimated value of PI is " + pi);
        }
      }
  
-     @Override
      public Configuration getConf() {
        return conf;
      }
  
-     @Override
      public void setConf(Configuration conf) {
        this.conf = conf;
-       this.masterTask = conf.get(MASTER_TASK);
      }
  
    }
@@ -97, +84 @@

        IOException {
      // BSP job configuration
      HamaConfiguration conf = new HamaConfiguration();
-     // Execute locally
-     // conf.set("bsp.master.address", "local");
  
-     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+     BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
      // Set the job name
-     bsp.setJobName("pi estimation example");
+     bsp.setJobName("serialize printing");
-     bsp.setBspClass(MyEstimator.class);
+     bsp.setBspClass(HelloBSP.class);
  
+     // Set the task size as a number of GroomServer
      BSPJobClient jobClient = new BSPJobClient(conf);
-     ClusterStatus cluster = jobClient.getClusterStatus(true);
+     ClusterStatus cluster = jobClient.getClusterStatus(false);
-     // Choose one as a master
-     for (String peerName : cluster.getActiveGroomNames().values()) {
-       conf.set(MASTER_TASK, peerName);
-       break;
+     bsp.setNumBspTask(cluster.getGroomServers());
+ 
+     FileSystem fileSys = FileSystem.get(conf);
+     if (fileSys.exists(new Path(TMP_OUTPUT))) {
+       fileSys.delete(new Path(TMP_OUTPUT), true);
      }
+     BSPJobClient.runJob(bsp);
  
-     BSPJobClient.runJob(bsp);
+     System.out.println("Each task printed the \"Hello World\" as below:");
+     for (int i = 0; i < cluster.getGroomServers(); i++) {
+       SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+           TMP_OUTPUT + i), conf);
+       LongWritable timestamp = new LongWritable();
+       Text message = new Text();
+       reader.next(timestamp, message);
+       System.out.println(new Date(timestamp.get()) + ": " + message);
+       reader.close();
+     }
    }
  }
  }}}