You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/21 07:38:36 UTC

svn commit: r1448523 [1/4] - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/main/java/org/apach...

Author: tjungblut
Date: Thu Feb 21 06:38:33 2013
New Revision: 1448523

URL: http://svn.apache.org/r1448523
Log:
[HAMA-735]: Tighten the graph API

Removed:
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java
    hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
    hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java
    hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
    hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
    hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
    hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
    hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java
    hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java
    hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java
    hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java
    hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java
    hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java
    hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java
    hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java
    hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
    hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
    hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java
    hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
    hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
    hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java
    hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java
    hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
    hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
    hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java
    hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
    hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/DenseVectorWritable.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SparseVectorWritable.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/LinearRegressionTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/LogisticRegressionTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SymmetricMatrixGenTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 21 06:38:33 2013
@@ -18,6 +18,7 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-735: Tighten the graph API (tjungblut)
    HAMA-714: Align format consistency between examples and generators (edwardyoon)
    HAMA-531: Reimplementation of partitioner (edwardyoon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java Thu Feb 21 06:38:33 2013
@@ -38,7 +38,7 @@ public class HamaConfiguration extends C
     super();
     this.addResource(confFile);
   }
-  
+
   /**
    * Create a clone of passed configuration.
    * 

Modified: hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java Thu Feb 21 06:38:33 2013
@@ -23,18 +23,17 @@ import java.lang.annotation.RetentionPol
 import java.lang.annotation.Target;
 
 /**
- * An attribute that 
- * holds the information on version and build information of a
+ * An attribute that holds the information on version and build information of a
  * Hama package that a potential user is working with.
  */
 
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.PACKAGE)
-
 public @interface HamaVersionAnnotation {
 
   /**
    * Get the Hama version
+   * 
    * @return the version string "0.3.3-dev"
    */
   String version();
@@ -46,6 +45,7 @@ public @interface HamaVersionAnnotation 
 
   /**
    * Get the date when Hama was compiled.
+   * 
    * @return the date in unix 'date' format
    */
   String date();
@@ -57,23 +57,23 @@ public @interface HamaVersionAnnotation 
 
   /**
    * Get the subversion revision.
+   * 
    * @return the revision number as a string (eg. "451451")
    */
   String revision();
 
   /**
    * Get the branch from which this was compiled.
+   * 
    * @return The branch name, e.g. "trunk" or "branches/branch-0.20"
    */
   String branch();
 
   /**
-   * Get a checksum of the source files from which
-   * Hama was compiled.
+   * Get a checksum of the source files from which Hama was compiled.
+   * 
    * @return a string that uniquely identifies the source
    **/
-  String srcChecksum(); 	
-
-
+  String srcChecksum();
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Thu Feb 21 06:38:33 2013
@@ -28,27 +28,27 @@ import org.apache.hama.bsp.sync.SyncExce
 public abstract class BSP<K1, V1, K2, V2, M extends Writable> implements
     BSPInterface<K1, V1, K2, V2, M> {
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException;
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException {
 
   }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
 
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Thu Feb 21 06:38:33 2013
@@ -23,42 +23,42 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.sync.SyncException;
 
 /**
- * The {@link BSPInterface} defines the basic operations needed to implement a BSP
- * based algorithm.
- * The implementing algorithm takes {@link BSPPeer}s as parameters which are
- * responsible for communication, reading K1-V1 inputs, collecting k2-V2 outputs
- * and exchanging messages of type M.
+ * The {@link BSPInterface} defines the basic operations needed to implement a
+ * BSP based algorithm. The implementing algorithm takes {@link BSPPeer}s as
+ * parameters which are responsible for communication, reading K1-V1 inputs,
+ * collecting k2-V2 outputs and exchanging messages of type M.
  */
 public interface BSPInterface<K1, V1, K2, V2, M extends Writable> {
 
-    /**
-     * This method is your computation method, the main work of your BSP should be
-     * done here.
-     *
-     * @param peer Your BSPPeer instance.
-     * @throws java.io.IOException
-     * @throws org.apache.hama.bsp.sync.SyncException
-     * @throws InterruptedException
-     */
-    public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException, SyncException, InterruptedException;
+  /**
+   * This method is your computation method, the main work of your BSP should be
+   * done here.
+   * 
+   * @param peer Your BSPPeer instance.
+   * @throws java.io.IOException
+   * @throws org.apache.hama.bsp.sync.SyncException
+   * @throws InterruptedException
+   */
+  public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+      SyncException, InterruptedException;
 
-    /**
-     * This method is called before the BSP method. It can be used for setup
-     * purposes.
-     *
-     * @param peer Your BSPPeer instance.
-     * @throws IOException
-     */
-    public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
-            SyncException, InterruptedException;
+  /**
+   * This method is called before the BSP method. It can be used for setup
+   * purposes.
+   * 
+   * @param peer Your BSPPeer instance.
+   * @throws IOException
+   */
+  public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+      SyncException, InterruptedException;
 
-    /**
-     * This method is called after the BSP method. It can be used for cleanup
-     * purposes. Cleanup is guranteed to be called after the BSP runs, even in
-     * case of exceptions.
-     *
-     * @param peer Your BSPPeer instance.
-     * @throws IOException
-     */
-    public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException;
+  /**
+   * This method is called after the BSP method. It can be used for cleanup
+   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+   * case of exceptions.
+   * 
+   * @param peer Your BSPPeer instance.
+   * @throws IOException
+   */
+  public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException;
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Thu Feb 21 06:38:33 2013
@@ -252,8 +252,9 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings({ "rawtypes" })
   public InputFormat getInputFormat() {
-    return ReflectionUtils.newInstance(conf.getClass(Constants.INPUT_FORMAT_CLASS,
-        TextInputFormat.class, InputFormat.class), conf);
+    return ReflectionUtils.newInstance(
+        conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class,
+            InputFormat.class), conf);
   }
 
   @SuppressWarnings({ "rawtypes" })
@@ -374,19 +375,22 @@ public class BSPJob extends BSPJobContex
    */
   @SuppressWarnings("rawtypes")
   public void setPartitioner(Class<? extends Partitioner> theClass) {
-    conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass, Partitioner.class);
+    conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass,
+        Partitioner.class);
   }
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return ReflectionUtils.newInstance(
-        conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,Partitioner.class), conf);
+    return ReflectionUtils.newInstance(conf.getClass(
+        Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
+        Partitioner.class), conf);
   }
 
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
-    return ReflectionUtils.newInstance(conf.getClass(Constants.OUTPUT_FORMAT_CLASS,
-        TextOutputFormat.class, OutputFormat.class), conf);
+    return ReflectionUtils.newInstance(conf.getClass(
+        Constants.OUTPUT_FORMAT_CLASS, TextOutputFormat.class,
+        OutputFormat.class), conf);
   }
 
   protected void setCheckPointInterval(int checkPointInterval) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Feb 21 06:38:33 2013
@@ -375,13 +375,13 @@ public class BSPJobClient extends Config
     if (fs.isFile(inputDir)) {
       inputDir = inputDir.getParent();
     }
-    
+
     Path partitionDir = new Path(inputDir + "/partitions");
 
     if (fs.exists(partitionDir)) {
       fs.delete(partitionDir, true);
     }
-    
+
     if (job.get("bsp.partitioning.runner.job") != null) {
       return job;
     }// Early exit for the partitioner job.
@@ -477,7 +477,9 @@ public class BSPJobClient extends Config
 
     if (maxTasks < job.getNumBspTask()) {
       throw new IOException(
-          "Job failed! The number of tasks has exceeded the maximum allowed.");
+          "Job failed! The number of tasks has exceeded the maximum allowed. Maxtasks: "
+              + maxTasks + " < configured number of tasks: "
+              + job.getNumBspTask());
     }
     return maxTasks;
   }
@@ -560,7 +562,8 @@ public class BSPJobClient extends Config
             && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
             && job.get("bsp.partitioning.runner.job") == null) {
           LOG.debug(((FileSplit) split).getPath().getName());
-          String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+              .split("[-]");
           rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
         }
 
@@ -1122,9 +1125,9 @@ public class BSPJobClient extends Config
       out.writeInt(partitionID);
       bytes.write(out);
       WritableUtils.writeVInt(out, locations.length);
-        for (String location : locations) {
-            Text.writeString(out, location);
-        }
+      for (String location : locations) {
+        Text.writeString(out, location);
+      }
     }
 
     public long getDataLength() {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java Thu Feb 21 06:38:33 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp;
 
 import java.io.DataInput;
-
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.NumberFormat;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Feb 21 06:38:33 2013
@@ -65,15 +65,15 @@ import org.apache.zookeeper.Watcher;
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs. It has the following responsibilities:
  * <ol>
- * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * <li><b>Job submission</b>. BSPMaster is responsible for accepting new job
  * requests and assigning the job to scheduler for scheduling BSP Tasks defined
  * for the job.
- * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom 
- * servers in the cluster. It is responsible for adding new grooms to the 
- * cluster and keeping a tab on all the grooms and could blacklist a groom if 
- * it get fails the availability requirement.
- * <li> BSPMaster keeps track of all the task status for each job and handles
- * the failure of job as requested by the jobs.  
+ * <li><b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
+ * servers in the cluster. It is responsible for adding new grooms to the
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if it
+ * get fails the availability requirement.
+ * <li>BSPMaster keeps track of all the task status for each job and handles the
+ * failure of job as requested by the jobs.
  * </ol>
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
@@ -128,7 +128,8 @@ public class BSPMaster implements JobSub
 
   // Jobs' Meta Data
   private Integer nextJobId = 1;
-  private int totalSubmissions = 0; // how many jobs has been submitted by clients
+  private int totalSubmissions = 0; // how many jobs has been submitted by
+                                    // clients
   private int totalTasks = 0; // currnetly running tasks
   private int totalTaskCapacity; // max tasks that groom server can run
 
@@ -148,11 +149,10 @@ public class BSPMaster implements JobSub
   private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
 
   /**
-   * ReportGroomStatusHandler keeps track of the status reported by each 
-   * Groomservers on the task they are executing currently. Based on the 
-   * status reported, it is responsible for issuing task recovery requests, 
-   * updating the job progress and other book keeping on currently running
-   * jobs. 
+   * ReportGroomStatusHandler keeps track of the status reported by each
+   * Groomservers on the task they are executing currently. Based on the status
+   * reported, it is responsible for issuing task recovery requests, updating
+   * the job progress and other book keeping on currently running jobs.
    */
   private class ReportGroomStatusHandler implements DirectiveHandler {
 
@@ -190,10 +190,9 @@ public class BSPMaster implements JobSub
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (ts.getRunState() == TaskStatus.State.FAILED) {
-              if(jip.handleFailure(tip)){
+              if (jip.handleFailure(tip)) {
                 recoverTask(jip);
-              }
-              else {
+              } else {
                 jip.status.setRunState(JobStatus.FAILED);
                 jip.failedTask(tip, ts);
               }
@@ -210,7 +209,7 @@ public class BSPMaster implements JobSub
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
-              
+
               GroomProtocol worker = findGroomServer(tmpStatus);
               Directive d1 = new DispatchTasksDirective(
                   new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -439,17 +438,17 @@ public class BSPMaster implements JobSub
 
   void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
-      for (String localDir : localDirs) {
-          FileSystem.getLocal(conf).delete(new Path(localDir), true);
-      }
+    for (String localDir : localDirs) {
+      FileSystem.getLocal(conf).delete(new Path(localDir), true);
+    }
   }
 
   void deleteLocalFiles(String subdir) throws IOException {
     try {
       String[] localDirs = getLocalDirs();
-        for (String localDir : localDirs) {
-            FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
-        }
+      for (String localDir : localDirs) {
+        FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
+      }
     } catch (NullPointerException e) {
       LOG.info(e);
     }
@@ -465,6 +464,7 @@ public class BSPMaster implements JobSub
 
   /**
    * Starts the BSP Master process.
+   * 
    * @param conf The Hama configuration.
    * @return an instance of BSPMaster
    * @throws IOException
@@ -477,6 +477,7 @@ public class BSPMaster implements JobSub
 
   /**
    * Starts the BSP Master process
+   * 
    * @param conf The Hama configuration
    * @param identifier Identifier for the job.
    * @return
@@ -500,6 +501,7 @@ public class BSPMaster implements JobSub
 
   /**
    * Initialize the global synchronization client.
+   * 
    * @param conf Hama configuration.
    */
   private void initZK(HamaConfiguration conf) {
@@ -509,9 +511,10 @@ public class BSPMaster implements JobSub
 
   /**
    * Get a handle of the global synchronization client used by BSPMaster.
+   * 
    * @return The synchronization client.
    */
-  public MasterSyncClient getSyncClient(){
+  public MasterSyncClient getSyncClient() {
     return this.syncClient;
   }
 
@@ -604,9 +607,9 @@ public class BSPMaster implements JobSub
     JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
         this.conf);
     ++totalSubmissions;
-    if(LOG.isDebugEnabled()){
-      LOG.debug("Submitting job number = " + totalSubmissions + 
-          " id = " + job.getJobID());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Submitting job number = " + totalSubmissions + " id = "
+          + job.getJobID());
     }
     return addJob(jobID, job);
   }
@@ -755,10 +758,10 @@ public class BSPMaster implements JobSub
     }
     return job.getStatus();
   }
-  
+
   /**
-   * Recovers task in job. To be called when a particular task in a job has failed 
-   * and there is a need to schedule it on a machine.
+   * Recovers task in job. To be called when a particular task in a job has
+   * failed and there is a need to schedule it on a machine.
    */
   private synchronized void recoverTask(JobInProgress job) {
     ++totalSubmissions;
@@ -893,7 +896,7 @@ public class BSPMaster implements JobSub
     try {
       this.syncClient.close();
     } catch (IOException e) {
-      LOG.error("Error closing the sync client",e);
+      LOG.error("Error closing the sync client", e);
     }
     if (null != this.supervisor.get()) {
       this.supervisor.get().stop();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 21 06:38:33 2013
@@ -58,7 +58,10 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
+    TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
+    MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
+    COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Thu Feb 21 06:38:33 2013
@@ -69,7 +69,8 @@ public final class BSPTask extends Task 
       boolean shouldKillSelf = false;
       try {
         if (LOG.isDebugEnabled())
-          LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
+          LOG.debug("Pinging at time "
+              + Calendar.getInstance().getTimeInMillis());
         // if the RPC call returns false, it means that groomserver does not
         // have knowledge of this task.
         shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
@@ -114,8 +115,8 @@ public final class BSPTask extends Task 
 
   private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
 
-    long pingPeriod = job.getConfiguration().getLong(Constants.GROOM_PING_PERIOD,
-        Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
+    long pingPeriod = job.getConfiguration().getLong(
+        Constants.GROOM_PING_PERIOD, Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
 
     try {
       if (pingPeriod > 0) {
@@ -156,7 +157,8 @@ public final class BSPTask extends Task 
       throws Exception {
 
     BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
-        .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class),
+        .newInstance(
+            job.getConfiguration().getClass("bsp.work.class", BSP.class),
             job.getConfiguration());
 
     // The policy is to throw the first exception and log the remaining.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Thu Feb 21 06:38:33 2013
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.net.NetworkTopology;
@@ -210,12 +210,12 @@ public abstract class CombineFileInputFo
 
     // Finally, process all paths that do not belong to any pool.
     ArrayList<Path> myPaths = new ArrayList<Path>();
-      for (Path path : paths) {
-          if (path == null) { // already processed
-              continue;
-          }
-          myPaths.add(path);
+    for (Path path : paths) {
+      if (path == null) { // already processed
+        continue;
       }
+      myPaths.add(path);
+    }
     // create splits for all files that are not in any pool.
     getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
         minSizeNode, minSizeRack, splits);
@@ -261,48 +261,47 @@ public abstract class CombineFileInputFo
 
     // process all nodes and create splits that are local
     // to a node.
-      for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks
-              .entrySet()) {
+    for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks.entrySet()) {
 
-          nodes.add(one.getKey());
-          List<OneBlockInfo> blocksInNode = one.getValue();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
 
-          // for each block, copy it into validBlocks. Delete it from
-          // blockToNodes so that the same block does not appear in
-          // two different splits.
-          for (OneBlockInfo oneblock : blocksInNode) {
-              if (blockToNodes.containsKey(oneblock)) {
-                  validBlocks.add(oneblock);
-                  blockToNodes.remove(oneblock);
-                  curSplitSize += oneblock.length;
-
-                  // if the accumulated split size exceeds the maximum, then
-                  // create this split.
-                  if (maxSize != 0 && curSplitSize >= maxSize) {
-                      // create an input split and add it to the splits array
-                      addCreatedSplit(job, splits, nodes, validBlocks);
-                      curSplitSize = 0;
-                      validBlocks.clear();
-                  }
-              }
-          }
-          // if there were any blocks left over and their combined size is
-          // larger than minSplitNode, then combine them into one split.
-          // Otherwise add them back to the unprocessed pool. It is likely
-          // that they will be combined with other blocks from the same rack later
-          // on.
-          if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, nodes, validBlocks);
-          } else {
-              for (OneBlockInfo oneblock : validBlocks) {
-                  blockToNodes.put(oneblock, oneblock.hosts);
-              }
+      // for each block, copy it into validBlocks. Delete it from
+      // blockToNodes so that the same block does not appear in
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(job, splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
           }
-          validBlocks.clear();
-          nodes.clear();
-          curSplitSize = 0;
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely
+      // that they will be combined with other blocks from the same rack later
+      // on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(job, splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
       }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
 
     // if blocks in a rack are below the specified minimum size, then keep them
     // in 'overflow'. After the processing of all racks is complete, these
@@ -322,57 +321,56 @@ public abstract class CombineFileInputFo
       // split size).
 
       // iterate over all racks
-        for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks
-                .entrySet()) {
+      for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks.entrySet()) {
 
-            racks.add(one.getKey());
-            List<OneBlockInfo> blocks = one.getValue();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
 
-            // for each block, copy it into validBlocks. Delete it from
-            // blockToNodes so that the same block does not appear in
-            // two different splits.
-            boolean createdSplit = false;
-            for (OneBlockInfo oneblock : blocks) {
-                if (blockToNodes.containsKey(oneblock)) {
-                    validBlocks.add(oneblock);
-                    blockToNodes.remove(oneblock);
-                    curSplitSize += oneblock.length;
-
-                    // if the accumulated split size exceeds the maximum, then
-                    // create this split.
-                    if (maxSize != 0 && curSplitSize >= maxSize) {
-                        // create an input split and add it to the splits array
-                        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-                        createdSplit = true;
-                        break;
-                    }
-                }
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
             }
+          }
+        }
 
-            // if we created a split, then just go to the next rack
-            if (createdSplit) {
-                curSplitSize = 0;
-                validBlocks.clear();
-                racks.clear();
-                continue;
-            }
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
 
-            if (!validBlocks.isEmpty()) {
-                if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-                    // if there is a mimimum size specified, then create a single split
-                    // otherwise, store these blocks into overflow data structure
-                    addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-                } else {
-                    // There were a few blocks in this rack that remained to be
-                    // processed.
-                    // Keep them in 'overflow' block list. These will be combined later.
-                    overflowBlocks.addAll(validBlocks);
-                }
-            }
-            curSplitSize = 0;
-            validBlocks.clear();
-            racks.clear();
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a mimimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that remained to be
+            // processed.
+            // Keep them in 'overflow' block list. These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
         }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
     }
 
     assert blockToNodes.isEmpty();
@@ -387,7 +385,7 @@ public abstract class CombineFileInputFo
 
       // This might cause an exiting rack location to be re-added,
       // but it should be ok.
-        Collections.addAll(racks, oneblock.racks);
+      Collections.addAll(racks, oneblock.racks);
 
       // if the accumulated split size exceeds the maximum, then
       // create this split.
@@ -425,7 +423,7 @@ public abstract class CombineFileInputFo
 
     // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, length,
-            locations.toArray(new String[locations.size()]));
+        locations.toArray(new String[locations.size()]));
     splitList.add(thissplit);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Thu Feb 21 06:38:33 2013
@@ -179,7 +179,8 @@ public class CombineFileSplit implements
       if (i == 0) {
         sb.append("Paths:");
       }
-      sb.append(paths[i].toUri().getPath()).append(":").append(startoffset[i]).append("+").append(lengths[i]);
+      sb.append(paths[i].toUri().getPath()).append(":").append(startoffset[i])
+          .append("+").append(lengths[i]);
       if (i < paths.length - 1) {
         sb.append(",");
       }
@@ -187,9 +188,9 @@ public class CombineFileSplit implements
     if (locations != null) {
       String locs = "";
       StringBuffer locsb = new StringBuffer();
-        for (String location : locations) {
-            locsb.append(location).append(":");
-        }
+      for (String location : locations) {
+        locsb.append(location).append(":");
+      }
       locs = locsb.toString();
       sb.append(" Locations:").append(locs).append("; ");
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java Thu Feb 21 06:38:33 2013
@@ -614,7 +614,8 @@ public class Counters implements Writabl
     for (Group group : this) {
       sb.append("\n\t").append(group.getDisplayName());
       for (Counter counter : group) {
-        sb.append("\n\t\t").append(counter.getDisplayName()).append("=").append(counter.getCounter());
+        sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
+            .append(counter.getCounter());
       }
     }
     return sb.toString();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Thu Feb 21 06:38:33 2013
@@ -45,7 +45,7 @@ public class Directive implements Writab
     }
   }
 
-    public Directive() {
+  public Directive() {
   }
 
   public Directive(Directive.Type type) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Thu Feb 21 06:38:33 2013
@@ -143,8 +143,8 @@ public abstract class FileInputFormat<K,
       } else {
         for (FileStatus globStat : matches) {
           if (globStat.isDir()) {
-              Collections.addAll(result, fs.listStatus(globStat.getPath(),
-                      inputFilter));
+            Collections.addAll(result,
+                fs.listStatus(globStat.getPath(), inputFilter));
           } else {
             result.add(globStat);
           }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Thu Feb 21 06:38:33 2013
@@ -60,8 +60,8 @@ public abstract class FileOutputFormat<K
   public static void setOutputCompressorClass(BSPJob conf,
       Class<? extends CompressionCodec> codecClass) {
     setCompressOutput(conf, true);
-    conf.getConfiguration().setClass("bsp.output.compression.codec", codecClass,
-        CompressionCodec.class);
+    conf.getConfiguration().setClass("bsp.output.compression.codec",
+        codecClass, CompressionCodec.class);
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Feb 21 06:38:33 2013
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -41,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -98,7 +98,7 @@ public class GroomServer implements Runn
     NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
   }
 
-    private HttpServer server;
+  private HttpServer server;
   private ZooKeeper zk = null;
 
   // Running States and its related things
@@ -276,17 +276,17 @@ public class GroomServer implements Runn
         LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
       }
 
-        for (TaskInProgress tip : outOfContactTasks) {
-            try {
-                LOG.debug("Purging task " + tip);
-                purgeTask(tip, true);
-            } catch (Exception e) {
-                LOG.error(
-                        new StringBuilder("Error while removing a timed-out task - ")
-                                .append(tip.toString()), e);
+      for (TaskInProgress tip : outOfContactTasks) {
+        try {
+          LOG.debug("Purging task " + tip);
+          purgeTask(tip, true);
+        } catch (Exception e) {
+          LOG.error(
+              new StringBuilder("Error while removing a timed-out task - ")
+                  .append(tip.toString()), e);
 
-            }
         }
+      }
       outOfContactTasks.clear();
 
     }
@@ -464,15 +464,15 @@ public class GroomServer implements Runn
     LOG.debug(localDirs);
 
     if (localDirs != null) {
-        for (String localDir : localDirs) {
-            try {
-                LOG.info(localDir);
-                DiskChecker.checkDir(new File(localDir));
-                writable = true;
-            } catch (DiskErrorException e) {
-                LOG.warn("BSP Processor local " + e.getMessage());
-            }
+      for (String localDir : localDirs) {
+        try {
+          LOG.info(localDir);
+          DiskChecker.checkDir(new File(localDir));
+          writable = true;
+        } catch (DiskErrorException e) {
+          LOG.warn("BSP Processor local " + e.getMessage());
         }
+      }
     }
 
     if (!writable)
@@ -485,18 +485,17 @@ public class GroomServer implements Runn
 
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
-      for (String localDir : localDirs) {
-          FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
-      }
+    for (String localDir : localDirs) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
+    }
   }
 
   public void deleteLocalFiles(String subdir) throws IOException {
     try {
       String[] localDirs = getLocalDirs();
-        for (String localDir : localDirs) {
-            FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir),
-                    true);
-        }
+      for (String localDir : localDirs) {
+        FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir), true);
+      }
     } catch (NullPointerException e) {
       LOG.info(e);
     }
@@ -808,10 +807,10 @@ public class GroomServer implements Runn
       // Task is out of contact if it has not pinged since more than
       // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
       // to get started.
-      
+
       // TODO Please refactor this conditions
-      // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod 
- 
+      // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
+
       if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
           && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Thu Feb 21 06:38:33 2013
@@ -58,7 +58,7 @@ public abstract class GroomServerAction 
     UPDATE_PEER
   }
 
-    /**
+  /**
    * A factory-method to create objects of given {@link ActionType}.
    * 
    * @param actionType the {@link ActionType} of object to create.
@@ -79,17 +79,15 @@ public abstract class GroomServerAction 
       case KILL_JOB: {
         action = new KillJobAction();
       }
-      break;
-      case RECOVER_TASK:
-      {
+        break;
+      case RECOVER_TASK: {
         action = new RecoverTaskAction();
       }
-      break;
-      case UPDATE_PEER:
-      {
+        break;
+      case UPDATE_PEER: {
         action = new UpdatePeerAction();
       }
-      break;
+        break;
       case REINIT_GROOM: {
         action = new ReinitGroomAction();
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Thu Feb 21 06:38:33 2013
@@ -123,13 +123,13 @@ public class GroomServerStatus implement
    */
   public int countTasks() {
     int taskCount = 0;
-      for (TaskStatus ts : taskReports) {
-          TaskStatus.State state = ts.getRunState();
-          if (state == TaskStatus.State.RUNNING
-                  || state == TaskStatus.State.UNASSIGNED) {
-              taskCount++;
-          }
+    for (TaskStatus ts : taskReports) {
+      TaskStatus.State state = ts.getRunState();
+      if (state == TaskStatus.State.RUNNING
+          || state == TaskStatus.State.UNASSIGNED) {
+        taskCount++;
       }
+    }
 
     return taskCount;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu Feb 21 06:38:33 2013
@@ -120,13 +120,14 @@ public class JobInProgress {
   private TaskAllocationStrategy taskAllocationStrategy;
 
   private FaultTolerantMasterService faultToleranceService;
-  
+
   /**
    * Used only for unit tests.
+   * 
    * @param jobId
    * @param conf
    */
-  public JobInProgress(BSPJobID jobId, Configuration conf){
+  public JobInProgress(BSPJobID jobId, Configuration conf) {
     this.conf = conf;
     this.jobId = jobId;
     master = null;
@@ -161,8 +162,8 @@ public class JobInProgress {
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
         numBSPTasks + 10);
 
-    this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS,
-        Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+    this.maxTaskAttempts = job.getConfiguration().getInt(
+        Constants.MAX_TASK_ATTEMPTS, Constants.DEFAULT_MAX_TASK_ATTEMPTS);
 
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
         job.getJobName());
@@ -300,9 +301,8 @@ public class JobInProgress {
 
     if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
 
-      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS, 
-          AsyncRcvdMsgCheckpointImpl.class ,
-          BSPFaultTolerantService.class);
+      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+          AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
       if (ftClass != null) {
         try {
           faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
@@ -340,25 +340,25 @@ public class JobInProgress {
     Task result = null;
     BSPResource[] resources = new BSPResource[0];
 
-      for (TaskInProgress task : tasks) {
-          if (!task.isRunning() && !task.isComplete()) {
+    for (TaskInProgress task : tasks) {
+      if (!task.isRunning() && !task.isComplete()) {
 
-              String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
-                      groomStatuses, taskCountInGroomMap, resources, task);
-              GroomServerStatus groomStatus = taskAllocationStrategy
-                      .getGroomToAllocate(groomStatuses, selectedGrooms,
-                              taskCountInGroomMap, resources, task);
-              if (groomStatus != null) {
-                  result = task.constructTask(groomStatus);
-              } else if (LOG.isDebugEnabled()) {
-                  LOG.debug("Could not find a groom to schedule task");
-              }
-              if (result != null) {
-                  updateGroomTaskDetails(task.getGroomServerStatus(), result);
-              }
-              break;
-          }
+        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+            groomStatuses, taskCountInGroomMap, resources, task);
+        GroomServerStatus groomStatus = taskAllocationStrategy
+            .getGroomToAllocate(groomStatuses, selectedGrooms,
+                taskCountInGroomMap, resources, task);
+        if (groomStatus != null) {
+          result = task.constructTask(groomStatus);
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Could not find a groom to schedule task");
+        }
+        if (result != null) {
+          updateGroomTaskDetails(task.getGroomServerStatus(), result);
+        }
+        break;
       }
+    }
 
     counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
@@ -542,9 +542,9 @@ public class JobInProgress {
       //
       // kill all TIPs.
       //
-        for (TaskInProgress task : tasks) {
-            task.kill();
-        }
+      for (TaskInProgress task : tasks) {
+        task.kill();
+      }
 
       garbageCollect();
     }
@@ -557,12 +557,12 @@ public class JobInProgress {
    */
   synchronized void garbageCollect() {
     try {
-      
-      if(LOG.isDebugEnabled()){
+
+      if (LOG.isDebugEnabled()) {
         LOG.debug("Removing " + localJobFile + " and " + localJarFile
             + " getJobFile = " + profile.getJobFile());
       }
-      
+
       // Definitely remove the local-disk copy of the job file
       if (localJobFile != null) {
         localFs.delete(localJobFile, true);
@@ -644,21 +644,19 @@ public class JobInProgress {
       return false;
 
     if (!faultToleranceService.isAlreadyRecovered(tip)) {
-      if(LOG.isDebugEnabled()){
+      if (LOG.isDebugEnabled()) {
         LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
       }
       recoveryTasks.add(tip);
       status.setRunState(JobStatus.RECOVERING);
       return true;
-    }
-    else if(LOG.isDebugEnabled()){
+    } else if (LOG.isDebugEnabled()) {
       LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
     }
     return false;
-    
+
   }
-  
-  
+
   /**
    * 
    * @return Returns the list of tasks in progress that has to be recovered.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Thu Feb 21 06:38:33 2013
@@ -40,9 +40,10 @@ abstract class JobInProgressListener {
    * @throws IOException
    */
   public abstract void jobRemoved(JobInProgress job) throws IOException;
-  
+
   /**
    * Invoked when a task in job has to be recovered by {@link BSPMaster}.
+   * 
    * @param job The job to which the task belongs to.
    * @param task that has to be recovered
    * @throws IOException

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu Feb 21 06:38:33 2013
@@ -65,6 +65,7 @@ public class PartitioningRunner extends
     converter = ReflectionUtils.newInstance(conf.getClass(
         Constants.RUNTIME_PARTITION_RECORDCONVERTER,
         DefaultRecordConverter.class, RecordConverter.class), conf);
+    converter.setup(conf);
 
     if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
       this.partitionDir = new Path(inputDir + "/partitions");
@@ -82,6 +83,8 @@ public class PartitioningRunner extends
    */
   public static interface RecordConverter {
 
+    public void setup(Configuration conf);
+
     /**
      * Should return the Key-Value pair constructed from the input format.
      * 
@@ -94,9 +97,10 @@ public class PartitioningRunner extends
         KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
-        @SuppressWarnings("rawtypes") Partitioner partitioner,
-        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
-        int numTasks);
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks);
   }
 
   /**
@@ -113,12 +117,18 @@ public class PartitioningRunner extends
     @SuppressWarnings("unchecked")
     @Override
     public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
-        @SuppressWarnings("rawtypes") Partitioner partitioner,
-        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
-        int numTasks) {
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks) {
       return Math.abs(partitioner.getPartition(outputRecord.getKey(),
           outputRecord.getValue(), numTasks));
     }
+
+    @Override
+    public void setup(Configuration conf) {
+
+    }
   }
 
   @Override
@@ -159,7 +169,7 @@ public class PartitioningRunner extends
       }
       values.get(index).put(outputPair.getKey(), outputPair.getValue());
     }
-    
+
     // The reason of use of Memory is to reduce file opens
     for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
@@ -177,55 +187,55 @@ public class PartitioningRunner extends
     FileStatus[] status = fs.listStatus(partitionDir);
     // Call sync() one more time to avoid concurrent access
     peer.sync();
-    
+
     // merge files into one.
     // TODO if we use header info, we might able to merge files without full
     // scan.
-      for (FileStatus statu : status) {
-          int partitionID = Integer.parseInt(statu.getPath().getName()
-                  .split("[-]")[1]);
-          int denom = desiredNum / peer.getNumPeers();
-          int assignedID = partitionID;
-          if (denom > 1) {
-              assignedID = partitionID / denom;
-          }
-
-          if (assignedID == peer.getNumPeers())
-              assignedID = assignedID - 1;
+    for (FileStatus statu : status) {
+      int partitionID = Integer
+          .parseInt(statu.getPath().getName().split("[-]")[1]);
+      int denom = desiredNum / peer.getNumPeers();
+      int assignedID = partitionID;
+      if (denom > 1) {
+        assignedID = partitionID / denom;
+      }
+
+      if (assignedID == peer.getNumPeers())
+        assignedID = assignedID - 1;
+
+      // TODO set replica factor to 1.
+      // TODO and check whether we can write to specific DataNode.
+      if (assignedID == peer.getPeerIndex()) {
+        Path partitionFile = new Path(partitionDir + "/"
+            + getPartitionName(partitionID));
+
+        FileStatus[] files = fs.listStatus(statu.getPath());
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+            partitionFile, outputKeyClass, outputValueClass,
+            CompressionType.NONE);
+
+        for (int i = 0; i < files.length; i++) {
+          LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
+              + "/" + getPartitionName(partitionID));
+
+          SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+              files[i].getPath(), conf);
+
+          Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+              conf);
+          Writable value = (Writable) ReflectionUtils.newInstance(
+              outputValueClass, conf);
 
-          // TODO set replica factor to 1.
-          // TODO and check whether we can write to specific DataNode.
-          if (assignedID == peer.getPeerIndex()) {
-              Path partitionFile = new Path(partitionDir + "/"
-                      + getPartitionName(partitionID));
-
-              FileStatus[] files = fs.listStatus(statu.getPath());
-              SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-                      partitionFile, outputKeyClass, outputValueClass,
-                      CompressionType.NONE);
-
-              for (int i = 0; i < files.length; i++) {
-                  LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
-                          + "/" + getPartitionName(partitionID));
-
-                  SequenceFile.Reader reader = new SequenceFile.Reader(fs,
-                          files[i].getPath(), conf);
-
-                  Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
-                          conf);
-                  Writable value = (Writable) ReflectionUtils.newInstance(
-                          outputValueClass, conf);
-
-                  while (reader.next(key, value)) {
-                      writer.append(key, value);
-                  }
-                  reader.close();
-              }
-
-              writer.close();
-              fs.delete(statu.getPath(), true);
+          while (reader.next(key, value)) {
+            writer.append(key, value);
           }
+          reader.close();
+        }
+
+        writer.close();
+        fs.delete(statu.getPath(), true);
       }
+    }
   }
 
   @SuppressWarnings("rawtypes")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java Thu Feb 21 06:38:33 2013
@@ -18,9 +18,8 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Thu Feb 21 06:38:33 2013
@@ -45,8 +45,8 @@ public class RecoverTaskAction extends G
   public Task getTask() {
     return task;
   }
-  
-  public long getSuperstepCount(){
+
+  public long getSuperstepCount() {
     return superstepNumber.get();
   }
 
@@ -54,7 +54,7 @@ public class RecoverTaskAction extends G
   public void write(DataOutput out) throws IOException {
     task.write(out);
     superstepNumber.write(out);
-    
+
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Feb 21 06:38:33 2013
@@ -251,16 +251,16 @@ class SimpleTaskScheduler extends TaskSc
       }
 
       // assembly into actions
-        for (Task task : taskSet) {
-            GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
-            List<GroomServerAction> taskActions = actionMap.get(groomStatus);
-            if (taskActions == null) {
-                taskActions = new ArrayList<GroomServerAction>(
-                        groomStatus.getMaxTasks());
-            }
-            taskActions.add(new LaunchTaskAction(task));
-            actionMap.put(groomStatus, taskActions);
+      for (Task task : taskSet) {
+        GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+        List<GroomServerAction> taskActions = actionMap.get(groomStatus);
+        if (taskActions == null) {
+          taskActions = new ArrayList<GroomServerAction>(
+              groomStatus.getMaxTasks());
         }
+        taskActions.add(new LaunchTaskAction(task));
+        actionMap.put(groomStatus, taskActions);
+      }
 
       sendDirectivesToGrooms(actionMap);
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Thu Feb 21 06:38:33 2013
@@ -29,7 +29,7 @@ public class TaskCompletionEvent impleme
     FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
   }
 
-    private int eventId;
+  private int eventId;
   private String groomServerInfo;
   private int taskRunTime; // using int since runtime is the time difference
   private TaskAttemptID taskId;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Thu Feb 21 06:38:33 2013
@@ -18,11 +18,10 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,7 +67,7 @@ public class TaskInProgress {
 
   // The first taskid of this tip
   private TaskAttemptID firstTaskId;
-  
+
   private TaskAttemptID currentTaskId;
 
   // Map from task Id -> GroomServer Id, contains tasks that are
@@ -145,17 +144,17 @@ public class TaskInProgress {
       Map<GroomServerStatus, Integer> tasksInGroomMap,
       String[] possibleLocations) {
 
-      for (String location : possibleLocations) {
-          GroomServerStatus groom = grooms.get(location);
-          if (groom == null)
-              continue;
-          Integer taskInGroom = tasksInGroomMap.get(groom);
-          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-          if (taskInGroom < groom.getMaxTasks()
-                  && location.equals(groom.getGroomHostName())) {
-              return groom.getGroomHostName();
-          }
+    for (String location : possibleLocations) {
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
       }
+    }
     return null;
   }
 
@@ -168,16 +167,16 @@ public class TaskInProgress {
   private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) {
 
-      for (String s : grooms.keySet()) {
-          GroomServerStatus groom = grooms.get(s);
-          if (groom == null)
-              continue;
-          Integer taskInGroom = tasksInGroomMap.get(groom);
-          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-          if (taskInGroom < groom.getMaxTasks()) {
-              return groom.getGroomHostName();
-          }
+    for (String s : grooms.keySet()) {
+      GroomServerStatus groom = grooms.get(s);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
       }
+    }
     return null;
   }
 
@@ -188,7 +187,7 @@ public class TaskInProgress {
    * @return
    */
   public Task constructTask(GroomServerStatus groomStatus) {
-    if(groomStatus == null){
+    if (groomStatus == null) {
       return null;
     }
     TaskAttemptID taskId = computeTaskId();
@@ -198,8 +197,8 @@ public class TaskInProgress {
       String splitClass = null;
       BytesWritable split = null;
       if (rawSplit != null) {
-    	  splitClass = rawSplit.getClassName();
-    	  split = rawSplit.getBytes();
+        splitClass = rawSplit.getClassName();
+        split = rawSplit.getBytes();
       }
       currentTaskId = taskId;
       String groomName = groomStatus.getGroomHostName();
@@ -218,8 +217,8 @@ public class TaskInProgress {
     String splitClass = null;
     BytesWritable split = null;
     if (rawSplit != null) {
-  	  splitClass = rawSplit.getClassName();
-  	  split = rawSplit.getBytes();
+      splitClass = rawSplit.getClassName();
+      split = rawSplit.getBytes();
     }
     Task t = null;
     String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
@@ -492,8 +491,8 @@ public class TaskInProgress {
   public RawSplit getFileSplit() {
     return this.rawSplit;
   }
-  
-  public TaskAttemptID getCurrentTaskAttemptId(){
+
+  public TaskAttemptID getCurrentTaskAttemptId() {
     return this.currentTaskId;
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Thu Feb 21 06:38:33 2013
@@ -112,9 +112,9 @@ public class TaskLog {
     File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
         purgeTimeStamp));
     if (oldTaskLogs != null) {
-        for (File oldTaskLog : oldTaskLogs) {
-            FileUtil.fullyDelete(oldTaskLog);
-        }
+      for (File oldTaskLog : oldTaskLogs) {
+        FileUtil.fullyDelete(oldTaskLog);
+      }
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Thu Feb 21 06:38:33 2013
@@ -123,7 +123,7 @@ public class TaskRunner extends Thread {
 
         int exit_code = bspProcess.waitFor();
         if (!bspKilled && exit_code != 0) {
-          
+
           throw new IOException("BSP task process exit with nonzero status of "
               + exit_code + ". command = " + commands);
         }
@@ -181,11 +181,11 @@ public class TaskRunner extends Thread {
       }
       File[] libs = new File(workDir, "lib").listFiles();
       if (libs != null) {
-          for (File lib : libs) {
-              // add libs from jar to classpath
-              classPath.append(SYSTEM_PATH_SEPARATOR);
-              classPath.append(lib);
-          }
+        for (File lib : libs) {
+          // add libs from jar to classpath
+          classPath.append(SYSTEM_PATH_SEPARATOR);
+          classPath.append(lib);
+        }
       }
       classPath.append(SYSTEM_PATH_SEPARATOR);
       classPath.append(new File(workDir, "classes"));
@@ -204,11 +204,12 @@ public class TaskRunner extends Thread {
     vargs.add(jvm.toString());
 
     // bsp.child.java.opts
-    String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts", "-Xmx200m");
+    String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts",
+        "-Xmx200m");
     javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
 
     String[] javaOptsSplit = javaOpts.split(" ");
-      Collections.addAll(vargs, javaOptsSplit);
+    Collections.addAll(vargs, javaOptsSplit);
 
     // Add classpath.
     vargs.add("-classpath");
@@ -225,15 +226,14 @@ public class TaskRunner extends Thread {
       vargs.add(groomServer.groomHostName);
       vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
       TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
-      
-      if(status != null && 
-          TaskStatus.State.RECOVERING.equals(status.getRunState())){
+
+      if (status != null
+          && TaskStatus.State.RECOVERING.equals(status.getRunState())) {
         vargs.add(TaskStatus.State.RECOVERING.name());
-      }
-      else{
+      } else {
         vargs.add(TaskStatus.State.RUNNING.name());
       }
-      
+
     }
     return vargs;
   }
@@ -296,7 +296,7 @@ public class TaskRunner extends Thread {
     if (bspProcess != null) {
       bspProcess.destroy();
     }
-    
+
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java Thu Feb 21 06:38:33 2013
@@ -26,4 +26,4 @@ public class TextArrayWritable extends A
     super(Text.class);
   }
 
-}
\ No newline at end of file
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java Thu Feb 21 06:38:33 2013
@@ -24,21 +24,21 @@ import java.io.IOException;
 import org.apache.hadoop.io.Text;
 
 /**
- * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
- * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill a task.
  */
 class UpdatePeerAction extends GroomServerAction {
   TaskAttemptID taskId;
   TaskAttemptID peerTaskId;
   Text groomName;
-  
+
   public UpdatePeerAction() {
     super(ActionType.UPDATE_PEER);
     taskId = new TaskAttemptID();
     groomName = new Text("");
   }
-  
-  public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId, 
+
+  public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId,
       String groom) {
     super(ActionType.UPDATE_PEER);
     this.taskId = taskId;
@@ -49,15 +49,15 @@ class UpdatePeerAction extends GroomServ
   public TaskAttemptID getTaskID() {
     return taskId;
   }
-  
-  public TaskAttemptID getPeerTaskID(){
+
+  public TaskAttemptID getPeerTaskID() {
     return peerTaskId;
   }
-  
-  public String getGroomName(){
+
+  public String getGroomName() {
     return groomName.toString();
   }
-  
+
   @Override
   public void write(DataOutput out) throws IOException {
     taskId.write(out);