You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/01/05 03:56:34 UTC

svn commit: r1227458 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java examples/src/main/java/org/apache/hama/examples/ShortestPaths.java

Author: edwardyoon
Date: Thu Jan  5 02:56:34 2012
New Revision: 1227458

URL: http://svn.apache.org/viewvc?rev=1227458&view=rev
Log:
HAMA-476 Splitter doesn't work correctly

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1227458&r1=1227457&r2=1227458&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Jan  5 02:56:34 2012
@@ -86,7 +86,7 @@ public class BSPJobClient extends Config
     JobProfile profile;
     JobStatus status;
     long statustime;
-    
+
     public NetworkedJob() {
     }
 
@@ -305,14 +305,20 @@ public class BSPJobClient extends Config
     fs.mkdirs(submitJobDir);
     short replication = (short) job.getInt("bsp.submit.replication", 10);
 
+    ClusterStatus clusterStatus = getClusterStatus(true);
+    int maxTasks = clusterStatus.getMaxTasks();
+    if (maxTasks < job.getNumBspTask()) {
+      job.setNumBspTask(maxTasks);
+    }
+
     // only create the splits if we have an input
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
       if (job.getConf().get("bsp.input.partitioner.class") != null) {
-        job = partition(job);
+        job = partition(job, maxTasks);
       }
-      job.setNumBspTask(writeSplits(job, submitSplitFile));
+      job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
 
@@ -368,9 +374,12 @@ public class BSPJobClient extends Config
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected BSPJob partition(BSPJob job) throws IOException {
-    InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
-    int numOfTasks = splits.length; // job.getNumBspTask();
+  protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+    InputSplit[] splits = job.getInputFormat().getSplits(
+        job,
+        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+            : maxTasks);
+
     String input = job.getConf().get("bsp.input.dir");
 
     if (input != null) {
@@ -396,7 +405,7 @@ public class BSPJobClient extends Config
       RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
 
       List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-          numOfTasks);
+          splits.length);
 
       CompressionType compressionType = getOutputCompressionType(job);
       Class<? extends CompressionCodec> outputCompressorClass = getOutputCompressorClass(
@@ -408,7 +417,7 @@ public class BSPJobClient extends Config
       }
 
       try {
-        for (int i = 0; i < numOfTasks; i++) {
+        for (int i = 0; i < splits.length; i++) {
           Path p = new Path(partitionedPath, getPartitionName(i));
           if (codec == null) {
             writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
@@ -429,7 +438,7 @@ public class BSPJobClient extends Config
           Object value = recordReader.createValue();
           while (recordReader.next(key, value)) {
             int index = Math.abs(partitioner.getPartition(key, value,
-                numOfTasks));
+                splits.length));
             writers.get(index).append(key, value);
           }
           LOG.debug("Done with split " + i);
@@ -447,8 +456,12 @@ public class BSPJobClient extends Config
     return job;
   }
 
+  private boolean isProperSize(int numBspTask, int maxTasks) {
+    return (numBspTask > 1 && numBspTask < maxTasks);
+  }
+
   private String getPartitionName(int i) {
-    return "part-"  + String.valueOf(100000 + i).substring(1, 6);
+    return "part-" + String.valueOf(100000 + i).substring(1, 6);
   }
 
   /**
@@ -492,8 +505,12 @@ public class BSPJobClient extends Config
     return codecClass;
   }
 
-  private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException {
-    InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
+  private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
+      throws IOException {
+    InputSplit[] splits = job.getInputFormat().getSplits(
+        job,
+        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+            : maxTasks);
 
     final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
         submitSplitFile, splits.length);
@@ -589,7 +606,7 @@ public class BSPJobClient extends Config
         lastReport = report;
       }
     }
-    
+
     LOG.info("The total number of supersteps: " + info.getSuperstepCount());
     // TODO job.getCounters().log(LOG);
     return job.isSuccessful();

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1227458&r1=1227457&r2=1227458&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java Thu Jan  5 02:56:34 2012
@@ -199,7 +199,7 @@ public class ShortestPaths extends
   }
 
   public static void printUsage() {
-    System.out.println("Usage: <startNode> <output path> <input path>");
+    System.out.println("Usage: <startNode> <output path> <input path> [numTasks]");
   }
 
   public static void main(String[] args) throws IOException,
@@ -221,6 +221,10 @@ public class ShortestPaths extends
     bsp.setOutputPath(new Path(args[1]));
     bsp.setInputPath(new Path(args[2]));
 
+    if(args.length == 4) {
+      bsp.setNumBspTask(Integer.parseInt(args[3]));
+    }
+    
     bsp.setBspClass(ShortestPaths.class);
     bsp.setInputFormat(SequenceFileInputFormat.class);
     bsp.setPartitioner(HashPartitioner.class);