You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/08 14:10:12 UTC
svn commit: r1068367 - in /incubator/hama/trunk: CHANGES.txt
src/examples/org/apache/hama/examples/PiEstimator.java
src/examples/org/apache/hama/examples/SerializePrinting.java
Author: edwardyoon
Date: Tue Feb 8 13:10:12 2011
New Revision: 1068367
URL: http://svn.apache.org/viewvc?rev=1068367&view=rev
Log:
Improvement of lack of information about the output of examples
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Feb 8 13:10:12 2011
@@ -51,6 +51,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-351: Improvement of lack of info- about the output of examples (edwardyoon)
HAMA-348: Remove hard-coded javaOpts (edwardyoon)
HAMA-347: Add implementation of umbilical interface (edwardyoon)
HAMA-346: Modify MniCluster so that developers can benefit when testing using Junit
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Feb 8 13:10:12 2011
@@ -22,18 +22,24 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
-import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.KeeperException;
public class PiEstimator {
private static String MASTER_TASK = "master.task.";
+ private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output");
public static class MyEstimator extends BSP {
public static final Log LOG = LogFactory.getLog(MyEstimator.class);
@@ -41,8 +47,8 @@ public class PiEstimator {
private String masterTask;
private static final int iterations = 10000;
- public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
- InterruptedException {
+ public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+ KeeperException, InterruptedException {
int in = 0, out = 0;
for (int i = 0; i < iterations; i++) {
double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
@@ -64,7 +70,7 @@ public class PiEstimator {
BSPMessage received;
while ((received = bspPeer.getCurrentMessage()) != null) {
LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
- if(pi == 0.0) {
+ if (pi == 0.0) {
pi = Bytes.toDouble(received.getData());
} else {
pi = (pi + Bytes.toDouble(received.getData())) / 2;
@@ -72,7 +78,13 @@ public class PiEstimator {
}
if (pi != 0.0) {
- LOG.info("\nEstimated value of PI is " + pi);
+ FileSystem fileSys = FileSystem.get(conf);
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ TMP_OUTPUT, DoubleWritable.class, DoubleWritable.class,
+ CompressionType.NONE);
+ writer.append(new DoubleWritable(pi), new DoubleWritable(0));
+ writer.close();
}
}
@@ -96,26 +108,41 @@ public class PiEstimator {
// Set the job name
bsp.setJobName("pi estimation example");
bsp.setBspClass(MyEstimator.class);
-
+
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
bsp.setNumBspTask(Integer.parseInt(args[0]));
} else {
// Set to maximum
bsp.setNumBspTask(cluster.getGroomServers());
}
-
+
// Choose one as a master
for (String peerName : cluster.getActiveGroomNames().values()) {
conf.set(MASTER_TASK, peerName);
break;
}
+ FileSystem fileSys = FileSystem.get(conf);
+ if (fileSys.exists(TMP_OUTPUT)) {
+ fileSys.delete(TMP_OUTPUT, true);
+ }
+
long startTime = System.currentTimeMillis();
BSPJobClient.runJob(bsp);
- System.out.println("Job Finished in "+
- (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT,
+ conf);
+ DoubleWritable output = new DoubleWritable();
+ DoubleWritable zero = new DoubleWritable();
+ reader.next(output, zero);
+ reader.close();
+
+ System.out.println("Estimated value of PI is " + output);
}
}
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1068367&r1=1068366&r2=1068367&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Tue Feb 8 13:10:12 2011
@@ -18,10 +18,17 @@
package org.apache.hama.examples;
import java.io.IOException;
+import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
@@ -31,23 +38,34 @@ import org.apache.hama.bsp.ClusterStatus
import org.apache.zookeeper.KeeperException;
public class SerializePrinting {
-
+ private static String TMP_OUTPUT = "/tmp/test-example/";
+
public static class HelloBSP extends BSP {
public static final Log LOG = LogFactory.getLog(HelloBSP.class);
private Configuration conf;
private final static int PRINT_INTERVAL = 5000;
- public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
- InterruptedException {
+ public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+ KeeperException, InterruptedException {
int num = Integer.parseInt(conf.get("bsp.peers.num"));
+ FileSystem fileSys = FileSystem.get(conf);
int i = 0;
for (String otherPeer : bspPeer.getAllPeerNames()) {
if (bspPeer.getPeerName().equals(otherPeer)) {
LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
+ bspPeer.getPeerName());
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+ "Hello BSP from " + (i + 1) + " of " + num + ": "
+ + bspPeer.getPeerName()));
+ writer.close();
+
}
-
+
Thread.sleep(PRINT_INTERVAL);
bspPeer.sync();
i++;
@@ -73,12 +91,27 @@ public class SerializePrinting {
// Set the job name
bsp.setJobName("serialize printing");
bsp.setBspClass(HelloBSP.class);
-
+
// Set the task size as a number of GroomServer
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(false);
bsp.setNumBspTask(cluster.getGroomServers());
-
+
+ FileSystem fileSys = FileSystem.get(conf);
+ if (fileSys.exists(new Path(TMP_OUTPUT))) {
+ fileSys.delete(new Path(TMP_OUTPUT), true);
+ }
BSPJobClient.runJob(bsp);
+
+ System.out.println("Each task printed the \"Hello World\" as below:");
+ for (int i = 0; i < cluster.getGroomServers(); i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+ TMP_OUTPUT + i), conf);
+ LongWritable timestamp = new LongWritable();
+ Text message = new Text();
+ reader.next(timestamp, message);
+ System.out.println(new Date(timestamp.get()) + ": " + message);
+ reader.close();
+ }
}
}