You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/01/05 06:08:11 UTC

svn commit: r1227483 - /incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java

Author: edwardyoon
Date: Thu Jan  5 05:08:11 2012
New Revision: 1227483

URL: http://svn.apache.org/viewvc?rev=1227483&view=rev
Log:
Trivial changes

Modified:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1227483&r1=1227482&r2=1227483&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Thu Jan  5 05:08:11 2012
@@ -22,7 +22,14 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
@@ -30,20 +37,21 @@ import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.NullInputFormat;
-import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.bsp.sync.SyncException;
 
 public class CombineExample {
-
+  private static Path TMP_OUTPUT = new Path("/tmp/combine-" + System.currentTimeMillis());
+  
   public static class MyBSP extends
-      BSP<NullWritable, NullWritable, NullWritable, NullWritable> {
+      BSP<NullWritable, NullWritable, Text, IntWritable> {
     public static final Log LOG = LogFactory.getLog(MyBSP.class);
 
     @Override
-    public void bsp(
-        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
+    public void bsp(BSPPeer<NullWritable, NullWritable, Text, IntWritable> peer)
         throws IOException, SyncException, InterruptedException {
       for (String peerName : peer.getAllPeerNames()) {
         peer.send(peerName, new IntegerMessage(peer.getPeerName(), 1));
@@ -54,7 +62,8 @@ public class CombineExample {
 
       IntegerMessage received;
       while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
-        LOG.info(received.getTag() + ": " + received.getData());
+        peer.write(new Text(received.getTag()),
+            new IntWritable(received.getData()));
       }
     }
 
@@ -72,10 +81,24 @@ public class CombineExample {
         sum += ((IntegerMessage) it.next()).getData();
       }
 
-      bundle.addMessage(new IntegerMessage("Sum", sum));
+      bundle.addMessage(new IntegerMessage("Sum = ", sum));
       return bundle;
     }
+  }
 
+  static void printOutput(HamaConfiguration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+    for (int i = 0; i < files.length; i++) {
+      if (files[i].getLen() > 0) {
+        FSDataInputStream in = fs.open(files[i].getPath());
+        IOUtils.copyBytes(in, System.out, conf, false);
+        in.close();
+        break;
+      }
+    }
+
+    fs.delete(TMP_OUTPUT, true);
   }
 
   public static void main(String[] args) throws InterruptedException,
@@ -89,9 +112,19 @@ public class CombineExample {
     bsp.setBspClass(MyBSP.class);
     bsp.setCombinerClass(SumCombiner.class);
     bsp.setInputFormat(NullInputFormat.class);
-    bsp.setOutputFormat(NullOutputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(IntWritable.class);
+    bsp.setOutputFormat(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
     bsp.setNumBspTask(2);
 
-    bsp.waitForCompletion(true);
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      printOutput(conf);
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
+
   }
 }