You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/11/19 18:35:57 UTC

svn commit: r1411326 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/ ml/src/test/java/org/apache/hama/ml/kmeans/ yarn/src/main/java/org/apache/hama/bsp/

Author: tjungblut
Date: Mon Nov 19 17:35:55 2012
New Revision: 1411326

URL: http://svn.apache.org/viewvc?rev=1411326&view=rev
Log:
[HAMA-678]: fix input splitting

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Nov 19 17:35:55 2012
@@ -10,6 +10,7 @@ Release 0.6 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-678: Fix input splitting (tjungblut)
    HAMA-613: Scheduler kills job too silently when out of slots (Yuesheng Hu via edwardyoon)
    HAMA-647: Make the input spliter robustly (Yuesheng Hu via edwardyoon)
    HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Mon Nov 19 17:35:55 2012
@@ -31,12 +31,14 @@ public abstract class BSP<K1, V1, K2, V2
     /**
      * {@inheritDoc}
      */
+    @Override
     public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException;
 
     /**
      * {@inheritDoc}
      */
+    @Override
     public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException {
 
@@ -45,6 +47,7 @@ public abstract class BSP<K1, V1, K2, V2
     /**
      * {@inheritDoc}
      */
+    @Override
     public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
 
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Nov 19 17:35:55 2012
@@ -301,20 +301,8 @@ public class BSPJobClient extends Config
       throws IOException {
     BSPJob job = pJob;
     job.setJobID(jobId);
-    int maxTasks = 0;
-    int limitTasks = job.getConf().getInt(Constants.MAX_TASKS_PER_JOB, 0);
-    
-    ClusterStatus clusterStatus = getClusterStatus(true);
-    
-    if(limitTasks > 0) {
-      maxTasks = limitTasks;
-    } else {
-      maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
-    }
-    
-    if (maxTasks < job.getNumBspTask()) {
-      throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed.");
-    }
+    int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0);
+    int maxTasks = checkTaskLimits(job, limitTasks);
     
     Path submitJobDir = new Path(getSystemDir(), "submit_"
         + Integer.toString(Math.abs(r.nextInt()), 36));
@@ -337,8 +325,8 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-      if (job.getConf().get("bsp.input.partitioner.class") != null
-          && !job.getConf()
+      if (job.getConfiguration().get("bsp.input.partitioner.class") != null
+          && !job.getConfiguration()
               .getBoolean("hama.graph.runtime.partitioning", false)) {
         job = partition(job, maxTasks);
         maxTasks = job.getInt("hama.partition.count", maxTasks);
@@ -384,6 +372,22 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
+  protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+    int maxTasks;
+    ClusterStatus clusterStatus = getClusterStatus(true);
+    
+    if(limitTasks > 0) {
+      maxTasks = limitTasks;
+    } else {
+      maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
+    }
+    
+    if (maxTasks < job.getNumBspTask()) {
+      throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed.");
+    }
+    return maxTasks;
+  }
+
   protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
       Path submitJobFile, FileSystem fs) throws IOException {
     //
@@ -405,7 +409,7 @@ public class BSPJobClient extends Config
         (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
             : maxTasks);
 
-    String input = job.getConf().get("bsp.input.dir");
+    String input = job.getConfiguration().get("bsp.input.dir");
 
     if (input != null) {
       InputFormat<?, ?> inputFormat = job.getInputFormat();
@@ -438,18 +442,18 @@ public class BSPJobClient extends Config
       CompressionCodec codec = null;
       if (outputCompressorClass != null) {
         codec = ReflectionUtils.newInstance(outputCompressorClass,
-            job.getConf());
+            job.getConfiguration());
       }
 
       try {
         for (int i = 0; i < splits.length; i++) {
           Path p = new Path(partitionedPath, getPartitionName(i));
           if (codec == null) {
-            writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+            writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
                 sampleReader.createKey().getClass(), sampleReader.createValue()
                     .getClass(), CompressionType.NONE));
           } else {
-            writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+            writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
                 sampleReader.createKey().getClass(), sampleReader.createValue()
                     .getClass(), compressionType, codec));
           }
@@ -516,7 +520,7 @@ public class BSPJobClient extends Config
   static Class<? extends CompressionCodec> getOutputCompressorClass(BSPJob job,
       Class<? extends CompressionCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
-    Configuration conf = job.getConf();
+    Configuration conf = job.getConfiguration();
     String name = conf.get("bsp.partitioning.compression.codec");
     if (name != null) {
       try {
@@ -537,7 +541,7 @@ public class BSPJobClient extends Config
         (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
             : maxTasks);
 
-    final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
+    final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(),
         submitSplitFile, splits.length);
     try {
       DataOutputBuffer buffer = new DataOutputBuffer();
@@ -706,7 +710,7 @@ public class BSPJobClient extends Config
 
   public static void runJob(BSPJob job) throws FileNotFoundException,
       IOException {
-    BSPJobClient jc = new BSPJobClient(job.getConf());
+    BSPJobClient jc = new BSPJobClient(job.getConfiguration());
 
     if (job.getNumBspTask() == 0
         || job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Mon Nov 19 17:35:55 2012
@@ -102,10 +102,18 @@ public class BSPJobContext {
     conf.writeXml(out);
   }
 
+  /**
+   * @deprecated use {@link #getConfiguration()} instead.
+   */
+  @Deprecated
   public Configuration getConf() {
     return this.conf;
   }
 
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
   public String get(String name) {
     return conf.get(name);
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Mon Nov 19 17:35:55 2012
@@ -114,7 +114,7 @@ public final class BSPTask extends Task 
 
   private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
 
-    long pingPeriod = job.getConf().getLong(Constants.GROOM_PING_PERIOD,
+    long pingPeriod = job.getConfiguration().getLong(Constants.GROOM_PING_PERIOD,
         Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
 
     try {
@@ -156,8 +156,8 @@ public final class BSPTask extends Task 
       throws Exception {
 
     BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
-        .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
-            job.getConf());
+        .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class),
+            job.getConfiguration());
 
     // The policy is to throw the first exception and log the remaining.
     Exception firstException = null;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -140,7 +140,7 @@ public abstract class CombineFileInputFo
   public InputSplit[] getSplits(BSPJob bspJob, int numSplits)
       throws IOException {
 
-    Configuration job = bspJob.getConf();
+    Configuration job = bspJob.getConfiguration();
 
     long minSizeNode = 0;
     long minSizeRack = 0;
@@ -446,7 +446,7 @@ public abstract class CombineFileInputFo
       this.fileSize = 0;
 
       // get block locations from file system
-      FileSystem fs = path.getFileSystem(job.getConf());
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
       FileStatus stat = fs.getFileStatus(path);
       BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
           stat.getLen());

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Mon Nov 19 17:35:55 2012
@@ -86,7 +86,7 @@ public class CombineFileSplit implements
   }
 
   public Configuration getJob() {
-    return job.getConf();
+    return job.getConfiguration();
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -47,7 +48,6 @@ public abstract class FileInputFormat<K,
 
   private static final double SPLIT_SLOP = 1.1; // 10% slop
 
-  private long minSplitSize = 1;
   private static final PathFilter hiddenFileFilter = new PathFilter() {
     @Override
     public boolean accept(Path p) {
@@ -56,10 +56,6 @@ public abstract class FileInputFormat<K,
     }
   };
 
-  protected void setMinSplitSize(long minSplitSize) {
-    this.minSplitSize = minSplitSize;
-  }
-
   /**
    * Proxy PathFilter that accepts a path only if all filters given in the
    * constructor do. Used by the listPaths() to apply the built-in
@@ -83,15 +79,6 @@ public abstract class FileInputFormat<K,
     }
   }
 
-  /**
-   * @param fs the file system that the file is on
-   * @param filename the file name to check
-   * @return is this file splitable?
-   */
-  protected boolean isSplitable(FileSystem fs, Path filename) {
-    return true;
-  }
-
   @Override
   public abstract RecordReader<K, V> getRecordReader(InputSplit split,
       BSPJob job) throws IOException;
@@ -103,7 +90,7 @@ public abstract class FileInputFormat<K,
    */
   public static void setInputPathFilter(BSPJob conf,
       Class<? extends PathFilter> filter) {
-    conf.getConf().setClass("bsp.input.pathFilter.class", filter,
+    conf.getConfiguration().setClass("bsp.input.pathFilter.class", filter,
         PathFilter.class);
   }
 
@@ -113,10 +100,10 @@ public abstract class FileInputFormat<K,
    * @return the PathFilter instance set for the job, NULL if none has been set.
    */
   public static PathFilter getInputPathFilter(BSPJob conf) {
-    Class<? extends PathFilter> filterClass = conf.getConf().getClass(
+    Class<? extends PathFilter> filterClass = conf.getConfiguration().getClass(
         "bsp.input.pathFilter.class", null, PathFilter.class);
     return (filterClass != null) ? ReflectionUtils.newInstance(filterClass,
-        conf.getConf()) : null;
+        conf.getConfiguration()) : null;
   }
 
   /**
@@ -147,7 +134,7 @@ public abstract class FileInputFormat<K,
     PathFilter inputFilter = new MultiPathFilter(filters);
 
     for (Path p : dirs) {
-      FileSystem fs = p.getFileSystem(job.getConf());
+      FileSystem fs = p.getFileSystem(job.getConfiguration());
       FileStatus[] matches = fs.globStatus(p, inputFilter);
       if (matches == null) {
         errors.add(new IOException("Input path does not exist: " + p));
@@ -175,73 +162,109 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Splits files returned by {@link #listStatus(BSPJob)} when they're too big.
+   * Splits files returned by {@link #listStatus(BSPJob)} when they're too big. <br/>
+   * numSplits will be ignored by the framework.
    */
   @Override
   public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
-    FileStatus[] files = listStatus(job);
-
-    long totalSize = computeTotalSize(job, files);
-    long goalSize = computeGoalSize(numSplits, totalSize);
-
-    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+    long maxSize = getMaxSplitSize(job);
 
-    // take the short circuit path if we have already partitioned
-    if (numSplits == files.length) {
-      for (FileStatus file : files) {
-        if (file != null) {
-          splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
-              new String[0]));
+    // generate splits
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    FileStatus[] files = listStatus(job);
+    for (FileStatus file : files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(job, path)) {
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+              blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+
+        if (bytesRemaining != 0) {
+          splits
+              .add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
+                  blkLocations[blkLocations.length - 1].getHosts()));
         }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else {
+        // Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
       }
-      return splits.toArray(new FileSplit[splits.size()]);
     }
 
-    LOG.info("numSplits: " + numSplits);
-    long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
-        minSplitSize);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong("bsp.input.files", files.length);
 
-    // generate splits
-    NetworkTopology clusterMap = new NetworkTopology();
-    for (FileStatus file : files) {
-      if (file != null) {
-        Path path = file.getPath();
-        FileSystem fs = path.getFileSystem(job.getConf());
-        long length = file.getLen();
-        BlockLocation[] blkLocations = fs
-            .getFileBlockLocations(file, 0, length);
-        if ((length != 0) && isSplitable(fs, path)) {
-          long blockSize = file.getBlockSize();
-          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-          LOG.info("computeSplitSize: " + splitSize + " (" + goalSize + ", "
-              + minSize + ", " + blockSize + ")");
-
-          long bytesRemaining = length;
-          while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-            String[] splitHosts = getSplitHosts(blkLocations, length
-                - bytesRemaining, splitSize, clusterMap);
-            splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
-                splitHosts));
-            bytesRemaining -= splitSize;
-          }
+    LOG.debug("Total # of splits: " + splits.size());
+    return splits.toArray(new InputSplit[splits.size()]);
+  }
 
-          if (bytesRemaining != 0) {
-            splits.add(new FileSplit(path, length - bytesRemaining,
-                bytesRemaining, blkLocations[blkLocations.length - 1]
-                    .getHosts()));
-          }
-        } else if (length != 0) {
-          String[] splitHosts = getSplitHosts(blkLocations, 0, length,
-              clusterMap);
-          splits.add(new FileSplit(path, 0, length, splitHosts));
-        } else {
-          // Create empty hosts array for zero length files
-          splits.add(new FileSplit(path, 0, length, new String[0]));
-        }
-      }
-    }
-    LOG.info("Total # of splits: " + splits.size());
-    return splits.toArray(new FileSplit[splits.size()]);
+  /**
+   * @return true if the file is splittable (default), false if not.
+   */
+  protected boolean isSplitable(BSPJob job, Path path) {
+    return true;
+  }
+
+  /**
+   * Get the lower bound on split size imposed by the format.
+   * 
+   * @return the number of bytes of the minimal split for this format
+   */
+  protected long getFormatMinSplitSize() {
+    return 1;
+  }
+
+  /**
+   * Set the minimum input split size
+   * 
+   * @param job the job to modify
+   * @param size the minimum size
+   */
+  public static void setMinInputSplitSize(Job job, long size) {
+    job.getConfiguration().setLong("bsp.min.split.size", size);
+  }
+
+  /**
+   * Get the minimum split size
+   * 
+   * @param job the job
+   * @return the minimum number of bytes that can be in a split
+   */
+  public static long getMinSplitSize(BSPJob job) {
+    return job.getConfiguration().getLong("bsp.min.split.size", 1L);
+  }
+
+  /**
+   * Set the maximum split size
+   * 
+   * @param job the job to modify
+   * @param size the maximum split size
+   */
+  public static void setMaxInputSplitSize(Job job, long size) {
+    job.getConfiguration().setLong("bsp.max.split.size", size);
+  }
+
+  /**
+   * Get the maximum split size.
+   * 
+   * @param context the job to look at.
+   * @return the maximum number of bytes a split can include
+   */
+  public static long getMaxSplitSize(BSPJob context) {
+    return context.getConfiguration().getLong("bsp.max.split.size",
+        Long.MAX_VALUE);
   }
 
   protected long computeTotalSize(BSPJob job, FileStatus[] files)
@@ -256,7 +279,7 @@ public abstract class FileInputFormat<K,
                 .equals(job.get("bsp.partitioning.dir")))) {
           // if we find the partitioning dir, just remove it.
           LOG.warn("Removing already existing partitioning directory " + path);
-          FileSystem fileSystem = path.getFileSystem(job.getConf());
+          FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
           if (!fileSystem.delete(path, true)) {
             LOG.error("Remove failed.");
           }
@@ -403,7 +426,7 @@ public abstract class FileInputFormat<K,
    * @return the list of input {@link Path}s for the BSP job.
    */
   public static Path[] getInputPaths(BSPJob conf) {
-    String dirs = conf.getConf().get("bsp.input.dir", "");
+    String dirs = conf.getConfiguration().get("bsp.input.dir", "");
     String[] list = StringUtils.split(dirs);
     Path[] result = new Path[list.length];
     for (int i = 0; i < list.length; i++) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Mon Nov 19 17:35:55 2012
@@ -36,7 +36,7 @@ public abstract class FileOutputFormat<K
    * @param compress should the output of the job be compressed?
    */
   public static void setCompressOutput(BSPJob conf, boolean compress) {
-    conf.getConf().setBoolean("bsp.output.compress", compress);
+    conf.getConfiguration().setBoolean("bsp.output.compress", compress);
   }
 
   /**
@@ -47,7 +47,7 @@ public abstract class FileOutputFormat<K
    *         <code>false</code> otherwise
    */
   public static boolean getCompressOutput(BSPJob conf) {
-    return conf.getConf().getBoolean("bsp.output.compress", false);
+    return conf.getConfiguration().getBoolean("bsp.output.compress", false);
   }
 
   /**
@@ -60,7 +60,7 @@ public abstract class FileOutputFormat<K
   public static void setOutputCompressorClass(BSPJob conf,
       Class<? extends CompressionCodec> codecClass) {
     setCompressOutput(conf, true);
-    conf.getConf().setClass("bsp.output.compression.codec", codecClass,
+    conf.getConfiguration().setClass("bsp.output.compression.codec", codecClass,
         CompressionCodec.class);
   }
 
@@ -79,7 +79,7 @@ public abstract class FileOutputFormat<K
     String name = conf.get("bsp.output.compression.codec");
     if (name != null) {
       try {
-        codecClass = conf.getConf().getClassByName(name)
+        codecClass = conf.getConfiguration().getClassByName(name)
             .asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name
@@ -98,7 +98,7 @@ public abstract class FileOutputFormat<K
       throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
     if (outDir != null) {
-      FileSystem fs = outDir.getFileSystem(job.getConf());
+      FileSystem fs = outDir.getFileSystem(job.getConfiguration());
       // normalize the output directory
       outDir = fs.makeQualified(outDir);
       setOutputPath(job, outDir);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Nov 19 17:35:55 2012
@@ -1259,7 +1259,7 @@ public class GroomServer implements Runn
 
       try {
         // use job-specified working directory
-        FileSystem.get(job.getConf()).setWorkingDirectory(
+        FileSystem.get(job.getConfiguration()).setWorkingDirectory(
             job.getWorkingDirectory());
 
         // instantiate and init our peer

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Nov 19 17:35:55 2012
@@ -155,13 +155,13 @@ public class JobInProgress {
     FileSystem fs = jobDir.getFileSystem(conf);
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
-    this.jobSplit = job.getConf().get("bsp.job.split.file");
+    this.jobSplit = job.getConfiguration().get("bsp.job.split.file");
 
     this.numBSPTasks = job.getNumBspTask();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
         numBSPTasks + 10);
 
-    this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+    this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS,
         Constants.DEFAULT_MAX_TASK_ATTEMPTS);
 
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Nov 19 17:35:55 2012
@@ -230,7 +230,7 @@ public class LocalBSPRunner implements J
       conf.set(Constants.PEER_HOST, "local");
 
       bsp = (BSP) ReflectionUtils.newInstance(
-          job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+          job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration());
 
     }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Mon Nov 19 17:35:55 2012
@@ -50,12 +50,14 @@ public class RecoverTaskAction extends G
     return superstepNumber.get();
   }
 
+  @Override
   public void write(DataOutput out) throws IOException {
     task.write(out);
     superstepNumber.write(out);
     
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     task = new BSPTask();
     task.readFields(in);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -19,17 +19,41 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 
 public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
 
-  public SequenceFileInputFormat() {
-    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
-  }
-
   @Override
   public RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
       throws IOException {
-    return new SequenceFileRecordReader<K, V>(job.getConf(), (FileSplit) split);
+    return new SequenceFileRecordReader<K, V>(job.getConfiguration(),
+        (FileSplit) split);
+  }
+
+  @Override
+  protected long getFormatMinSplitSize() {
+    return SequenceFile.SYNC_INTERVAL;
   }
+
+  @Override
+  protected FileStatus[] listStatus(BSPJob job) throws IOException {
+
+    FileStatus[] files = super.listStatus(job);
+    int len = files.length;
+    for (int i = 0; i < len; ++i) {
+      FileStatus file = files[i];
+      if (file.isDir()) { // it's a MapFile
+        Path p = file.getPath();
+        FileSystem fs = p.getFileSystem(job.getConfiguration());
+        // use the data file
+        files[i] = fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME));
+      }
+    }
+    return files;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java Mon Nov 19 17:35:55 2012
@@ -35,7 +35,7 @@ public class SequenceFileRecordWriter<K 
 
   public SequenceFileRecordWriter(FileSystem fs, BSPJob job, String name)
       throws IOException, ClassNotFoundException {
-    Configuration conf = job.getConf();
+    Configuration conf = job.getConfiguration();
     writer = new SequenceFile.Writer(fs, conf, new Path(
         conf.get("bsp.output.dir"), name), conf.getClassByName(conf
         .get("bsp.output.key.class")), conf.getClassByName(conf

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Nov 19 17:35:55 2012
@@ -327,6 +327,7 @@ class SimpleTaskScheduler extends TaskSc
       return Boolean.TRUE;
     }
 
+    @Override
     public Boolean call() {
       if (jip.isRecoveryPending()) {
         return scheduleRecoveryTasks();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Nov 19 17:35:55 2012
@@ -95,7 +95,7 @@ public class TaskRunner extends Thread {
 
     @Override
     public Object call() throws Exception {
-      final boolean consoleRedirect = bspJob.getConf().getBoolean(
+      final boolean consoleRedirect = bspJob.getConfiguration().getBoolean(
           "hama.child.redirect.log.console", false);
       ProcessBuilder builder = new ProcessBuilder(commands);
       builder.directory(workDir);
@@ -203,7 +203,7 @@ public class TaskRunner extends Thread {
     vargs.add(jvm.toString());
 
     // bsp.child.java.opts
-    String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+    String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts", "-Xmx200m");
     javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
 
     String[] javaOptsSplit = javaOpts.split(" ");
@@ -309,7 +309,7 @@ public class TaskRunner extends Thread {
   private void logStream(InputStream input, LogType type) {
     if (type == LogType.CONSOLE) {
       try {
-        IOUtils.copyBytes(input, System.out, bspJob.getConf());
+        IOUtils.copyBytes(input, System.out, bspJob.getConfiguration());
       } catch (IOException e) {
         // gracefully ignore any occuring exceptions here
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Mon Nov 19 17:35:55 2012
@@ -19,15 +19,25 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 
   @Override
   public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
       BSPJob job) throws IOException {
-    return new LineRecordReader(job.getConf(), (FileSplit) split);
+    return new LineRecordReader(job.getConfiguration(), (FileSplit) split);
+  }
+
+  @Override
+  protected boolean isSplitable(BSPJob job, Path path) {
+    CompressionCodec codec = new CompressionCodecFactory(job.getConfiguration())
+        .getCodec(path);
+    return codec == null;
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Mon Nov 19 17:35:55 2012
@@ -104,11 +104,11 @@ public class TextOutputFormat<K, V> exte
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job,
       String name) throws IOException {
     boolean isCompressed = getCompressOutput(job);
-    String keyValueSeparator = job.getConf().get(
+    String keyValueSeparator = job.getConfiguration().get(
         "bsp.textoutputformat.separator", "\t");
     if (!isCompressed) {
       Path file = FileOutputFormat.getTaskOutputPath(job, name);
-      FileSystem fs = file.getFileSystem(job.getConf());
+      FileSystem fs = file.getFileSystem(job.getConfiguration());
       FSDataOutputStream fileOut = fs.create(file);
       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
     } else {
@@ -116,11 +116,11 @@ public class TextOutputFormat<K, V> exte
           job, GzipCodec.class);
       // create the named codec
       CompressionCodec codec = ReflectionUtils.newInstance(codecClass,
-          job.getConf());
+          job.getConfiguration());
       // build the filename including the extension
       Path file = FileOutputFormat.getTaskOutputPath(job,
           name + codec.getDefaultExtension());
-      FileSystem fs = file.getFileSystem(job.getConf());
+      FileSystem fs = file.getFileSystem(job.getConfiguration());
       FSDataOutputStream fileOut = fs.create(file);
       return new LineRecordWriter<K, V>(new DataOutputStream(
           codec.createOutputStream(fileOut)), keyValueSeparator);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Nov 19 17:35:55 2012
@@ -509,7 +509,7 @@ public final class GraphJobRunner<V exte
         }
       }
 
-      int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+      int steps = (int) (maxSplitSize / conf.getLong( // 20 mb
           "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
 
       for (String peerName : peer.getAllPeerNames()) {

Modified: hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (original)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java Mon Nov 19 17:35:55 2012
@@ -78,8 +78,8 @@ public class TestKMeansBSP extends TestC
       System.out.println(centerMap);
       assertEquals(1, centerMap.size());
       DoubleVector doubleVector = centerMap.get(0);
-      assertTrue(doubleVector.get(0) > 50 && doubleVector.get(0) < 51);
-      assertTrue(doubleVector.get(1) > 50 && doubleVector.get(1) < 51);
+      assertTrue(doubleVector.get(0) >= 50 && doubleVector.get(0) < 51);
+      assertTrue(doubleVector.get(1) >= 50 && doubleVector.get(1) < 51);
     } finally {
       fs.delete(new Path("/tmp/clustering"), true);
     }

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Mon Nov 19 17:35:55 2012
@@ -190,6 +190,11 @@ public class YARNBSPJobClient extends BS
   }
 
   @Override
+  protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+    return Math.max(1, limitTasks);
+  }
+
+  @Override
   public Path getSystemDir() {
     return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/"));
   }