You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/12/09 05:20:24 UTC

svn commit: r1212243 - in /incubator/hama/trunk/core/src/test/java/org/apache/hama: bsp/IOSerializePrinting.java bsp/TestBSPMasterGroomServer.java bsp/TestCheckpoint.java bsp/TestIOJob.java bsp/TestLocalRunner.java examples/ClassSerializePrinting.java

Author: edwardyoon
Date: Fri Dec  9 04:20:24 2011
New Revision: 1212243

URL: http://svn.apache.org/viewvc?rev=1212243&view=rev
Log:
HAMA-486 Add unit test to verify correctness of messages between supersteps

Removed:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java
Modified:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Fri Dec  9 04:20:24 2011
@@ -22,19 +22,23 @@ package org.apache.hama.bsp;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.ClassSerializePrinting;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
 
   private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
   static String TMP_OUTPUT = "/tmp/test-example/";
+  static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
+
   private HamaConfiguration configuration;
 
   public TestBSPMasterGroomServer() {
@@ -56,12 +60,14 @@ public class TestBSPMasterGroomServer ex
 
   public void testSubmitJob() throws Exception {
     BSPJob bsp = new BSPJob(configuration,
-        org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class);
+        org.apache.hama.examples.ClassSerializePrinting.class);
     bsp.setJobName("Test Serialize Printing");
-    bsp
-        .setBspClass(org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class);
+    bsp.setBspClass(org.apache.hama.examples.ClassSerializePrinting.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(IntWritable.class);
+    bsp.setOutputValueClass(Text.class);
+    bsp.setOutputPath(OUTPUT_PATH);
 
-    // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
@@ -78,18 +84,32 @@ public class TestBSPMasterGroomServer ex
 
   public static void checkOutput(FileSystem fileSys, Configuration conf,
       int tasks) throws Exception {
-    for (int i = 0; i < tasks; i++) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
-          TMP_OUTPUT + "part-0000" + i), conf);
-      LongWritable timestamp = new LongWritable();
-      Text message = new Text();
-      reader.next(timestamp, message);
-
-      LOG.info("output: " + message);
-      assertTrue("Check if `Hello BSP' gets printed.", message.toString()
-          .indexOf("Hello BSP from") >= 0);
-      reader.close();
+    FileStatus[] listStatus = fileSys.listStatus(OUTPUT_PATH);
+    assertEquals(listStatus.length, tasks);
+    for (FileStatus status : listStatus) {
+      if (!status.isDir()) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fileSys,
+            status.getPath(), conf);
+        int superStep = 0;
+        int taskstep = 0;
+        IntWritable key = new IntWritable();
+        Text value = new Text();
+        /*
+         * The serialize printing task should write in each superstep
+         * "tasks"-times its superstep, along with the hostname.
+         */
+        while (reader.next(key, value)) {
+          assertEquals(superStep, key.get());
+          taskstep++;
+          if (taskstep % tasks == 0) {
+            superStep++;
+          }
+        }
+        // the maximum should be the number of supersteps defined in the task
+        assertEquals(superStep, ClassSerializePrinting.NUM_SUPERSTEPS);
+      }
     }
+
     fileSys.delete(new Path(TMP_OUTPUT), true);
   }
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri Dec  9 04:20:24 2011
@@ -33,10 +33,11 @@ public class TestCheckpoint extends Test
 
   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
 
+  @SuppressWarnings("unchecked")
   public void testCheckpoint() throws Exception {
     Configuration config = new HamaConfiguration();
     FileSystem dfs = FileSystem.get(config);
-    @SuppressWarnings("rawtypes")
+
     BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
     assertNotNull("BSPPeerImpl should not be null.", bspTask);
     if(dfs.mkdirs(new Path("checkpoint"))) {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java Fri Dec  9 04:20:24 2011
@@ -21,8 +21,7 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
@@ -34,15 +33,16 @@ public class TestLocalRunner extends Tes
     conf.set("bsp.local.dir", "/tmp/hama-test");
     BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
     bsp.setJobName("Test Serialize Printing with Output");
-    bsp.setBspClass(IOSerializePrinting.class);
-
+    
+    bsp.setBspClass(org.apache.hama.examples.ClassSerializePrinting.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(IntWritable.class);
+    bsp.setOutputValueClass(Text.class);
+    bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
+    
     conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
     bsp.setNumBspTask(2);
     bsp.setInputFormat(NullInputFormat.class);
-    bsp.setOutputFormat(SequenceFileOutputFormat.class);
-    bsp.setOutputKeyClass(LongWritable.class);
-    bsp.setOutputValueClass(Text.class);
-    bsp.setOutputPath(new Path(TestBSPMasterGroomServer.TMP_OUTPUT));
 
     FileSystem fileSys = FileSystem.get(conf);
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java?rev=1212243&r1=1212242&r2=1212243&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java Fri Dec  9 04:20:24 2011
@@ -19,68 +19,31 @@ package org.apache.hama.examples;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.sync.SyncException;
 
-public class ClassSerializePrinting {
-  private static String TMP_OUTPUT = "/tmp/test-example/";
+public class ClassSerializePrinting extends
+    BSP<NullWritable, NullWritable, IntWritable, Text> {
 
-  public static class HelloBSP extends
-      BSP<NullWritable, NullWritable, NullWritable, NullWritable> {
-    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
-    private Configuration conf;
-    private final static int PRINT_INTERVAL = 1000;
-    private FileSystem fileSys;
-    private int num;
-
-    public void bsp(
-        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> bspPeer)
-        throws IOException, SyncException, InterruptedException {
+  public static final int NUM_SUPERSTEPS = 15;
 
-      int i = 0;
+  @Override
+  public void bsp(BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer)
+      throws IOException, SyncException, InterruptedException {
+
+    for (int i = 0; i < NUM_SUPERSTEPS; i++) {
       for (String otherPeer : bspPeer.getAllPeerNames()) {
-        String peerName = bspPeer.getPeerName();
-        if (peerName.equals(otherPeer)) {
-          writeLogToFile(peerName, i);
-        }
-
-        Thread.sleep(PRINT_INTERVAL);
-        bspPeer.sync();
-        i++;
+        bspPeer.send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i));
       }
-    }
-
-    private void writeLogToFile(String string, int i) throws IOException {
-      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
-          new Path(TMP_OUTPUT, "part-0000" + i), LongWritable.class,
-          Text.class, CompressionType.NONE);
-      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
-          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
-      writer.close();
-    }
-
-    public Configuration getConf() {
-      return conf;
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      num = Integer.parseInt(conf.get("bsp.peers.num"));
-      try {
-        fileSys = FileSystem.get(conf);
-      } catch (IOException e) {
-        e.printStackTrace();
+      bspPeer.sync();
+      IntegerMessage msg = null;
+      while ((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) {
+        bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag()));
       }
     }
   }