You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/01/05 06:08:11 UTC
svn commit: r1227483 -
/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
Author: edwardyoon
Date: Thu Jan 5 05:08:11 2012
New Revision: 1227483
URL: http://svn.apache.org/viewvc?rev=1227483&view=rev
Log:
Trivial changes
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1227483&r1=1227482&r2=1227483&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Thu Jan 5 05:08:11 2012
@@ -22,7 +22,14 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
@@ -30,20 +37,21 @@ import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.IntegerMessage;
import org.apache.hama.bsp.NullInputFormat;
-import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;
public class CombineExample {
-
+ private static Path TMP_OUTPUT = new Path("/tmp/combine-" + System.currentTimeMillis());
+
public static class MyBSP extends
- BSP<NullWritable, NullWritable, NullWritable, NullWritable> {
+ BSP<NullWritable, NullWritable, Text, IntWritable> {
public static final Log LOG = LogFactory.getLog(MyBSP.class);
@Override
- public void bsp(
- BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
+ public void bsp(BSPPeer<NullWritable, NullWritable, Text, IntWritable> peer)
throws IOException, SyncException, InterruptedException {
for (String peerName : peer.getAllPeerNames()) {
peer.send(peerName, new IntegerMessage(peer.getPeerName(), 1));
@@ -54,7 +62,8 @@ public class CombineExample {
IntegerMessage received;
while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
- LOG.info(received.getTag() + ": " + received.getData());
+ peer.write(new Text(received.getTag()),
+ new IntWritable(received.getData()));
}
}
@@ -72,10 +81,24 @@ public class CombineExample {
sum += ((IntegerMessage) it.next()).getData();
}
- bundle.addMessage(new IntegerMessage("Sum", sum));
+ bundle.addMessage(new IntegerMessage("Sum = ", sum));
return bundle;
}
+ }
+ static void printOutput(HamaConfiguration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].getLen() > 0) {
+ FSDataInputStream in = fs.open(files[i].getPath());
+ IOUtils.copyBytes(in, System.out, conf, false);
+ in.close();
+ break;
+ }
+ }
+
+ fs.delete(TMP_OUTPUT, true);
}
public static void main(String[] args) throws InterruptedException,
@@ -89,9 +112,19 @@ public class CombineExample {
bsp.setBspClass(MyBSP.class);
bsp.setCombinerClass(SumCombiner.class);
bsp.setInputFormat(NullInputFormat.class);
- bsp.setOutputFormat(NullOutputFormat.class);
+ bsp.setOutputKeyClass(Text.class);
+ bsp.setOutputValueClass(IntWritable.class);
+ bsp.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
bsp.setNumBspTask(2);
- bsp.waitForCompletion(true);
+ long startTime = System.currentTimeMillis();
+ if (bsp.waitForCompletion(true)) {
+ printOutput(conf);
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
+ }
+
}
}