You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/12/09 05:20:24 UTC
svn commit: r1212243 - in
/incubator/hama/trunk/core/src/test/java/org/apache/hama:
bsp/IOSerializePrinting.java bsp/TestBSPMasterGroomServer.java
bsp/TestCheckpoint.java bsp/TestIOJob.java bsp/TestLocalRunner.java
examples/ClassSerializePrinting.java
Author: edwardyoon
Date: Fri Dec 9 04:20:24 2011
New Revision: 1212243
URL: http://svn.apache.org/viewvc?rev=1212243&view=rev
Log:
HAMA-486 Add unit test to verify correctness of messages between supersteps
Removed:
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java
Modified:
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Fri Dec 9 04:20:24 2011
@@ -22,19 +22,23 @@ package org.apache.hama.bsp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.ClassSerializePrinting;
public class TestBSPMasterGroomServer extends HamaCluster {
private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
static String TMP_OUTPUT = "/tmp/test-example/";
+ static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
+
private HamaConfiguration configuration;
public TestBSPMasterGroomServer() {
@@ -56,12 +60,14 @@ public class TestBSPMasterGroomServer ex
public void testSubmitJob() throws Exception {
BSPJob bsp = new BSPJob(configuration,
- org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class);
+ org.apache.hama.examples.ClassSerializePrinting.class);
bsp.setJobName("Test Serialize Printing");
- bsp
- .setBspClass(org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class);
+ bsp.setBspClass(org.apache.hama.examples.ClassSerializePrinting.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ bsp.setOutputKeyClass(IntWritable.class);
+ bsp.setOutputValueClass(Text.class);
+ bsp.setOutputPath(OUTPUT_PATH);
- // Set the task size as a number of GroomServer
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
ClusterStatus cluster = jobClient.getClusterStatus(false);
@@ -78,18 +84,32 @@ public class TestBSPMasterGroomServer ex
public static void checkOutput(FileSystem fileSys, Configuration conf,
int tasks) throws Exception {
- for (int i = 0; i < tasks; i++) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
- TMP_OUTPUT + "part-0000" + i), conf);
- LongWritable timestamp = new LongWritable();
- Text message = new Text();
- reader.next(timestamp, message);
-
- LOG.info("output: " + message);
- assertTrue("Check if `Hello BSP' gets printed.", message.toString()
- .indexOf("Hello BSP from") >= 0);
- reader.close();
+ FileStatus[] listStatus = fileSys.listStatus(OUTPUT_PATH);
+ assertEquals(listStatus.length, tasks);
+ for (FileStatus status : listStatus) {
+ if (!status.isDir()) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys,
+ status.getPath(), conf);
+ int superStep = 0;
+ int taskstep = 0;
+ IntWritable key = new IntWritable();
+ Text value = new Text();
+ /*
+ * The serialize printing task should write in each superstep
+ * "tasks"-times its superstep, along with the hostname.
+ */
+ while (reader.next(key, value)) {
+ assertEquals(superStep, key.get());
+ taskstep++;
+ if (taskstep % tasks == 0) {
+ superStep++;
+ }
+ }
+ // the maximum should be the number of supersteps defined in the task
+ assertEquals(superStep, ClassSerializePrinting.NUM_SUPERSTEPS);
+ }
}
+
fileSys.delete(new Path(TMP_OUTPUT), true);
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri Dec 9 04:20:24 2011
@@ -33,10 +33,11 @@ public class TestCheckpoint extends Test
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
+ @SuppressWarnings("unchecked")
public void testCheckpoint() throws Exception {
Configuration config = new HamaConfiguration();
FileSystem dfs = FileSystem.get(config);
- @SuppressWarnings("rawtypes")
+
BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
assertNotNull("BSPPeerImpl should not be null.", bspTask);
if(dfs.mkdirs(new Path("checkpoint"))) {
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java Fri Dec 9 04:20:24 2011
@@ -21,8 +21,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
@@ -34,15 +33,16 @@ public class TestLocalRunner extends Tes
conf.set("bsp.local.dir", "/tmp/hama-test");
BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
bsp.setJobName("Test Serialize Printing with Output");
- bsp.setBspClass(IOSerializePrinting.class);
-
+
+ bsp.setBspClass(org.apache.hama.examples.ClassSerializePrinting.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ bsp.setOutputKeyClass(IntWritable.class);
+ bsp.setOutputValueClass(Text.class);
+ bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
+
conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
bsp.setNumBspTask(2);
bsp.setInputFormat(NullInputFormat.class);
- bsp.setOutputFormat(SequenceFileOutputFormat.class);
- bsp.setOutputKeyClass(LongWritable.class);
- bsp.setOutputValueClass(Text.class);
- bsp.setOutputPath(new Path(TestBSPMasterGroomServer.TMP_OUTPUT));
FileSystem fileSys = FileSystem.get(conf);
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java Fri Dec 9 04:20:24 2011
@@ -19,68 +19,31 @@ package org.apache.hama.examples;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.IntegerMessage;
import org.apache.hama.bsp.sync.SyncException;
-public class ClassSerializePrinting {
- private static String TMP_OUTPUT = "/tmp/test-example/";
+public class ClassSerializePrinting extends
+ BSP<NullWritable, NullWritable, IntWritable, Text> {
- public static class HelloBSP extends
- BSP<NullWritable, NullWritable, NullWritable, NullWritable> {
- public static final Log LOG = LogFactory.getLog(HelloBSP.class);
- private Configuration conf;
- private final static int PRINT_INTERVAL = 1000;
- private FileSystem fileSys;
- private int num;
-
- public void bsp(
- BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> bspPeer)
- throws IOException, SyncException, InterruptedException {
+ public static final int NUM_SUPERSTEPS = 15;
- int i = 0;
+ @Override
+ public void bsp(BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer)
+ throws IOException, SyncException, InterruptedException {
+
+ for (int i = 0; i < NUM_SUPERSTEPS; i++) {
for (String otherPeer : bspPeer.getAllPeerNames()) {
- String peerName = bspPeer.getPeerName();
- if (peerName.equals(otherPeer)) {
- writeLogToFile(peerName, i);
- }
-
- Thread.sleep(PRINT_INTERVAL);
- bspPeer.sync();
- i++;
+ bspPeer.send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i));
}
- }
-
- private void writeLogToFile(String string, int i) throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- new Path(TMP_OUTPUT, "part-0000" + i), LongWritable.class,
- Text.class, CompressionType.NONE);
- writer.append(new LongWritable(System.currentTimeMillis()), new Text(
- "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
- writer.close();
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- num = Integer.parseInt(conf.get("bsp.peers.num"));
- try {
- fileSys = FileSystem.get(conf);
- } catch (IOException e) {
- e.printStackTrace();
+ bspPeer.sync();
+ IntegerMessage msg = null;
+ while ((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) {
+ bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag()));
}
}
}