You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2011/02/08 14:11:59 UTC
[Hama Wiki] Trivial Update of "PiEstimator" by edwardyoon
Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.
The "PiEstimator" page has been changed by edwardyoon.
http://wiki.apache.org/hama/PiEstimator?action=diff&rev1=7&rev2=8
--------------------------------------------------
* One task acts as master and collects the results through the BSP communication interface.
{{{
- public class PiEstimator {
- private static String MASTER_TASK = "master.task.";
+ public class SerializePrinting {
+ private static String TMP_OUTPUT = "/tmp/test-example/";
- public static class MyEstimator extends BSP {
+ public static class HelloBSP extends BSP {
- public static final Log LOG = LogFactory.getLog(MyEstimator.class);
+ public static final Log LOG = LogFactory.getLog(HelloBSP.class);
private Configuration conf;
+ private final static int PRINT_INTERVAL = 5000;
- private String masterTask;
- private static final int iterations = 10000;
- @Override
- public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ public void bsp(BSPPeerProtocol bspPeer) throws IOException,
- InterruptedException {
+ KeeperException, InterruptedException {
+ int num = Integer.parseInt(conf.get("bsp.peers.num"));
+ FileSystem fileSys = FileSystem.get(conf);
+
- int in = 0, out = 0;
+ int i = 0;
- for (int i = 0; i < iterations; i++) {
- double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
- if ((Math.sqrt(x * x + y * y) < 1.0)) {
- in++;
- } else {
- out++;
+ for (String otherPeer : bspPeer.getAllPeerNames()) {
+ if (bspPeer.getPeerName().equals(otherPeer)) {
+ LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
+ + bspPeer.getPeerName());
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+ "Hello BSP from " + (i + 1) + " of " + num + ": "
+ + bspPeer.getPeerName()));
+ writer.close();
+
}
- }
+ Thread.sleep(PRINT_INTERVAL);
- byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
- byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
- BSPMessage estimate = new BSPMessage(tagName, myData);
-
- bspPeer.send(masterTask, estimate);
- bspPeer.sync();
+ bspPeer.sync();
-
- double pi = 0.0;
- BSPMessage received;
- while ((received = bspPeer.getCurrentMessage()) != null) {
- LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
- if(pi == 0.0) {
- pi = Bytes.toDouble(received.getData());
- } else {
- pi = (pi + Bytes.toDouble(received.getData())) / 2;
- }
+ i++;
- }
-
- if (pi != 0.0) {
- LOG.info("\nEstimated value of PI is " + pi);
}
}
- @Override
public Configuration getConf() {
return conf;
}
- @Override
public void setConf(Configuration conf) {
this.conf = conf;
- this.masterTask = conf.get(MASTER_TASK);
}
}
@@ -97, +84 @@
IOException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
- // Execute locally
- // conf.set("bsp.master.address", "local");
- BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+ BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
// Set the job name
- bsp.setJobName("pi estimation example");
+ bsp.setJobName("serialize printing");
- bsp.setBspClass(MyEstimator.class);
+ bsp.setBspClass(HelloBSP.class);
+ // Set the task size as a number of GroomServer
BSPJobClient jobClient = new BSPJobClient(conf);
- ClusterStatus cluster = jobClient.getClusterStatus(true);
+ ClusterStatus cluster = jobClient.getClusterStatus(false);
- // Choose one as a master
- for (String peerName : cluster.getActiveGroomNames().values()) {
- conf.set(MASTER_TASK, peerName);
- break;
+ bsp.setNumBspTask(cluster.getGroomServers());
+
+ FileSystem fileSys = FileSystem.get(conf);
+ if (fileSys.exists(new Path(TMP_OUTPUT))) {
+ fileSys.delete(new Path(TMP_OUTPUT), true);
}
+ BSPJobClient.runJob(bsp);
- BSPJobClient.runJob(bsp);
+ System.out.println("Each task printed the \"Hello World\" as below:");
+ for (int i = 0; i < cluster.getGroomServers(); i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+ TMP_OUTPUT + i), conf);
+ LongWritable timestamp = new LongWritable();
+ Text message = new Text();
+ reader.next(timestamp, message);
+ System.out.println(new Date(timestamp.get()) + ": " + message);
+ reader.close();
+ }
}
}
}}}