You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/15 02:17:00 UTC

svn commit: r1070716 - in /incubator/hama/trunk/src: examples/org/apache/hama/examples/PiEstimator.java examples/org/apache/hama/examples/SerializePrinting.java java/org/apache/hama/bsp/BSPJobClient.java

Author: edwardyoon
Date: Tue Feb 15 01:16:59 2011
New Revision: 1070716

URL: http://svn.apache.org/viewvc?rev=1070716&view=rev
Log:
Change to use waitForCompletion() method instead of runJob() method

Modified:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Feb 15 01:16:59 2011
@@ -101,14 +101,15 @@ public class PiEstimator {
 
   }
 
+
   public static void main(String[] args) throws InterruptedException,
-      IOException {
+      IOException, ClassNotFoundException {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
 
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
-    bsp.setJobName("pi estimation example");
+    bsp.setJobName("Pi Estimation Example");
     bsp.setBspClass(MyEstimator.class);
 
     BSPJobClient jobClient = new BSPJobClient(conf);
@@ -128,23 +129,16 @@ public class PiEstimator {
     }
 
     FileSystem fileSys = FileSystem.get(conf);
-    if (fileSys.exists(TMP_OUTPUT)) {
-      fileSys.delete(TMP_OUTPUT, true);
-    }
+    initTempDir(fileSys);
 
     long startTime = System.currentTimeMillis();
-    BSPJobClient.runJob(bsp);
-    System.out.println("Job Finished in "
-        + (double) (System.currentTimeMillis() - startTime) / 1000.0
-        + " seconds");
-
-    SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT,
-        conf);
-    DoubleWritable output = new DoubleWritable();
-    DoubleWritable zero = new DoubleWritable();
-    reader.next(output, zero);
-    reader.close();
 
-    System.out.println("Estimated value of PI is " + output);
+    if (bsp.waitForCompletion(true)) {
+      printOutput(fileSys, conf);
+      
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
   }
 }

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Tue Feb 15 01:16:59 2011
@@ -44,24 +44,17 @@ public class SerializePrinting {
     public static final Log LOG = LogFactory.getLog(HelloBSP.class);
     private Configuration conf;
     private final static int PRINT_INTERVAL = 5000;
+    private FileSystem fileSys;
+    private int num;
 
     public void bsp(BSPPeerProtocol bspPeer) throws IOException,
         KeeperException, InterruptedException {
-      int num = Integer.parseInt(conf.get("bsp.peers.num"));
-      FileSystem fileSys = FileSystem.get(conf);
 
       int i = 0;
       for (String otherPeer : bspPeer.getAllPeerNames()) {
-        if (bspPeer.getPeerName().equals(otherPeer)) {
-
-          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
-              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
-              CompressionType.NONE);
-          writer.append(new LongWritable(System.currentTimeMillis()), new Text(
-              "Hello BSP from " + (i + 1) + " of " + num + ": "
-                  + bspPeer.getPeerName()));
-          writer.close();
-
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          writeLogToFile(peerName, i);
         }
 
         Thread.sleep(PRINT_INTERVAL);
@@ -70,24 +63,59 @@ public class SerializePrinting {
       }
     }
 
+    private void writeLogToFile(String string, int i) throws IOException {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          CompressionType.NONE);
+      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+      writer.close();
+    }
+
     public Configuration getConf() {
       return conf;
     }
 
     public void setConf(Configuration conf) {
       this.conf = conf;
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
     }
 
   }
 
+  private static void printOutput(FileSystem fileSys, ClusterStatus cluster,
+      HamaConfiguration conf) throws IOException {
+    System.out.println("Each task printed the \"Hello World\" as below:");
+    for (int i = 0; i < cluster.getGroomServers(); i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+      System.out.println(new Date(timestamp.get()) + ": " + message);
+      reader.close();
+    }
+  }
+
+  private static void initTempDir(FileSystem fileSys) throws IOException {
+    if (fileSys.exists(new Path(TMP_OUTPUT))) {
+      fileSys.delete(new Path(TMP_OUTPUT), true);
+    }
+  }
+
   public static void main(String[] args) throws InterruptedException,
-      IOException {
+      IOException, ClassNotFoundException {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
 
     BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
     // Set the job name
-    bsp.setJobName("serialize printing");
+    bsp.setJobName("Serialize Printing");
     bsp.setBspClass(HelloBSP.class);
 
     // Set the task size as a number of GroomServer
@@ -96,20 +124,11 @@ public class SerializePrinting {
     bsp.setNumBspTask(cluster.getGroomServers());
 
     FileSystem fileSys = FileSystem.get(conf);
-    if (fileSys.exists(new Path(TMP_OUTPUT))) {
-      fileSys.delete(new Path(TMP_OUTPUT), true);
-    }
-    BSPJobClient.runJob(bsp);
+    initTempDir(fileSys);
 
-    System.out.println("Each task printed the \"Hello World\" as below:");
-    for (int i = 0; i < cluster.getGroomServers(); i++) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
-          TMP_OUTPUT + i), conf);
-      LongWritable timestamp = new LongWritable();
-      Text message = new Text();
-      reader.next(timestamp, message);
-      System.out.println(new Date(timestamp.get()) + ": " + message);
-      reader.close();
+    if (bsp.waitForCompletion(true)) {
+      printOutput(fileSys, cluster, conf);
     }
   }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Feb 15 01:16:59 2011
@@ -354,20 +354,19 @@ public class BSPJobClient extends Config
       throws IOException, InterruptedException {
 
     String lastReport = null;
-    BSPJobID jobId = job.getJobID();
-    LOG.info("Running job: " + jobId);
+    LOG.info("Running job: " + info.getJobName());
 
     while (!job.isComplete()) {
       Thread.sleep(1000);
-      String report = " bsp " + StringUtils.formatPercent(job.progress(), 0);
+      String report = "bsp: " + StringUtils.formatPercent(job.progress(), 0);
 
       if (!report.equals(lastReport)) {
-        LOG.info(report);
+        LOG.debug(report);
         lastReport = report;
       }
     }
 
-    LOG.info("Job complete: " + jobId);
+    LOG.info("The total number of supersteps: " + info.getSuperstepCount());
     return job.isSuccessful();
   }