You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/15 02:17:00 UTC
svn commit: r1070716 - in /incubator/hama/trunk/src:
examples/org/apache/hama/examples/PiEstimator.java
examples/org/apache/hama/examples/SerializePrinting.java
java/org/apache/hama/bsp/BSPJobClient.java
Author: edwardyoon
Date: Tue Feb 15 01:16:59 2011
New Revision: 1070716
URL: http://svn.apache.org/viewvc?rev=1070716&view=rev
Log:
Change to use waitForCompletion() method instead of runJob() method
Modified:
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Feb 15 01:16:59 2011
@@ -101,14 +101,15 @@ public class PiEstimator {
}
+
public static void main(String[] args) throws InterruptedException,
- IOException {
+ IOException, ClassNotFoundException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
- bsp.setJobName("pi estimation example");
+ bsp.setJobName("Pi Estimation Example");
bsp.setBspClass(MyEstimator.class);
BSPJobClient jobClient = new BSPJobClient(conf);
@@ -128,23 +129,16 @@ public class PiEstimator {
}
FileSystem fileSys = FileSystem.get(conf);
- if (fileSys.exists(TMP_OUTPUT)) {
- fileSys.delete(TMP_OUTPUT, true);
- }
+ initTempDir(fileSys);
long startTime = System.currentTimeMillis();
- BSPJobClient.runJob(bsp);
- System.out.println("Job Finished in "
- + (double) (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT,
- conf);
- DoubleWritable output = new DoubleWritable();
- DoubleWritable zero = new DoubleWritable();
- reader.next(output, zero);
- reader.close();
- System.out.println("Estimated value of PI is " + output);
+ if (bsp.waitForCompletion(true)) {
+ printOutput(fileSys, conf);
+
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
+ }
}
}
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Tue Feb 15 01:16:59 2011
@@ -44,24 +44,17 @@ public class SerializePrinting {
public static final Log LOG = LogFactory.getLog(HelloBSP.class);
private Configuration conf;
private final static int PRINT_INTERVAL = 5000;
+ private FileSystem fileSys;
+ private int num;
public void bsp(BSPPeerProtocol bspPeer) throws IOException,
KeeperException, InterruptedException {
- int num = Integer.parseInt(conf.get("bsp.peers.num"));
- FileSystem fileSys = FileSystem.get(conf);
int i = 0;
for (String otherPeer : bspPeer.getAllPeerNames()) {
- if (bspPeer.getPeerName().equals(otherPeer)) {
-
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
- CompressionType.NONE);
- writer.append(new LongWritable(System.currentTimeMillis()), new Text(
- "Hello BSP from " + (i + 1) + " of " + num + ": "
- + bspPeer.getPeerName()));
- writer.close();
-
+ String peerName = bspPeer.getPeerName();
+ if (peerName.equals(otherPeer)) {
+ writeLogToFile(peerName, i);
}
Thread.sleep(PRINT_INTERVAL);
@@ -70,24 +63,59 @@ public class SerializePrinting {
}
}
+ private void writeLogToFile(String string, int i) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+ "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+ writer.close();
+ }
+
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
+ num = Integer.parseInt(conf.get("bsp.peers.num"));
+ try {
+ fileSys = FileSystem.get(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}
+ private static void printOutput(FileSystem fileSys, ClusterStatus cluster,
+ HamaConfiguration conf) throws IOException {
+ System.out.println("Each task printed the \"Hello World\" as below:");
+ for (int i = 0; i < cluster.getGroomServers(); i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+ TMP_OUTPUT + i), conf);
+ LongWritable timestamp = new LongWritable();
+ Text message = new Text();
+ reader.next(timestamp, message);
+ System.out.println(new Date(timestamp.get()) + ": " + message);
+ reader.close();
+ }
+ }
+
+ private static void initTempDir(FileSystem fileSys) throws IOException {
+ if (fileSys.exists(new Path(TMP_OUTPUT))) {
+ fileSys.delete(new Path(TMP_OUTPUT), true);
+ }
+ }
+
public static void main(String[] args) throws InterruptedException,
- IOException {
+ IOException, ClassNotFoundException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
// Set the job name
- bsp.setJobName("serialize printing");
+ bsp.setJobName("Serialize Printing");
bsp.setBspClass(HelloBSP.class);
// Set the task size as a number of GroomServer
@@ -96,20 +124,11 @@ public class SerializePrinting {
bsp.setNumBspTask(cluster.getGroomServers());
FileSystem fileSys = FileSystem.get(conf);
- if (fileSys.exists(new Path(TMP_OUTPUT))) {
- fileSys.delete(new Path(TMP_OUTPUT), true);
- }
- BSPJobClient.runJob(bsp);
+ initTempDir(fileSys);
- System.out.println("Each task printed the \"Hello World\" as below:");
- for (int i = 0; i < cluster.getGroomServers(); i++) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
- TMP_OUTPUT + i), conf);
- LongWritable timestamp = new LongWritable();
- Text message = new Text();
- reader.next(timestamp, message);
- System.out.println(new Date(timestamp.get()) + ": " + message);
- reader.close();
+ if (bsp.waitForCompletion(true)) {
+ printOutput(fileSys, cluster, conf);
}
}
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1070716&r1=1070715&r2=1070716&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Feb 15 01:16:59 2011
@@ -354,20 +354,19 @@ public class BSPJobClient extends Config
throws IOException, InterruptedException {
String lastReport = null;
- BSPJobID jobId = job.getJobID();
- LOG.info("Running job: " + jobId);
+ LOG.info("Running job: " + info.getJobName());
while (!job.isComplete()) {
Thread.sleep(1000);
- String report = " bsp " + StringUtils.formatPercent(job.progress(), 0);
+ String report = "bsp: " + StringUtils.formatPercent(job.progress(), 0);
if (!report.equals(lastReport)) {
- LOG.info(report);
+ LOG.debug(report);
lastReport = report;
}
}
- LOG.info("Job complete: " + jobId);
+ LOG.info("The total number of supersteps: " + info.getSuperstepCount());
return job.isSuccessful();
}