You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/05/12 06:02:24 UTC

svn commit: r1481469 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java core/src/test/java/org/apache/hama/bsp/TestPartitioning.java

Author: edwardyoon
Date: Sun May 12 04:02:23 2013
New Revision: 1481469

URL: http://svn.apache.org/r1481469
Log:
HAMA-756: Timing issue and file merging algorithm in PartitioningRunner make job fail

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun May 12 04:02:23 2013
@@ -8,6 +8,7 @@ Release 0.7 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-756: Timing issue and file merging algorithm in PartitioningRunner make job fail (MaoYuan Xian via edwardyoon)
    HAMA-755: registerJob of ZKSyncBSPMasterClient makes a useless node in zookeeper (MaoYuan Xian via edwardyoon)
    HAMA-750: Fix bug of comma separated input paths (edwardyoon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun May 12 04:02:23 2013
@@ -191,31 +191,24 @@ public class PartitioningRunner extends
 
     peer.sync();
     FileStatus[] status = fs.listStatus(partitionDir);
+    // To avoid race condition, we should store the peer number
+    int peerNum = peer.getNumPeers();
     // Call sync() one more time to avoid concurrent access
     peer.sync();
 
     // merge files into one.
     // TODO if we use header info, we might able to merge files without full
     // scan.
-    for (FileStatus statu : status) {
+    for (FileStatus stat : status) {
       int partitionID = Integer
-          .parseInt(statu.getPath().getName().split("[-]")[1]);
-      int denom = desiredNum / peer.getNumPeers();
-      int assignedID = partitionID;
-      if (denom > 1) {
-        assignedID = partitionID / denom;
-      }
-
-      if (assignedID == peer.getNumPeers())
-        assignedID = assignedID - 1;
+          .parseInt(stat.getPath().getName().split("[-]")[1]);
 
       // TODO set replica factor to 1.
-      // TODO and check whether we can write to specific DataNode.
-      if (assignedID == peer.getPeerIndex()) {
+      if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
         Path partitionFile = new Path(partitionDir + "/"
             + getPartitionName(partitionID));
 
-        FileStatus[] files = fs.listStatus(statu.getPath());
+        FileStatus[] files = fs.listStatus(stat.getPath());
         SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
             partitionFile, outputKeyClass, outputValueClass,
             CompressionType.NONE);
@@ -239,11 +232,15 @@ public class PartitioningRunner extends
         }
 
         writer.close();
-        fs.delete(statu.getPath(), true);
+        fs.delete(stat.getPath(), true);
       }
     }
   }
 
+  public static int getMergeProcessorID(int partitionID, int peerNum) {
+    return partitionID % peerNum;
+  }
+
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
     return ReflectionUtils.newInstance(conf.getClass(

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Sun May 12 04:02:23 2013
@@ -90,6 +90,15 @@ public class TestPartitioning extends Ha
 
     FileSystem fs = FileSystem.get(conf);
     fs.delete(OUTPUT_PATH, true);
+    
+    getMergeProcessorID();
+  }
+
+  public void getMergeProcessorID() {
+    int peerNum = 6;
+    for (int partitionID = 0; partitionID < 8; partitionID++) {
+      assertTrue(PartitioningRunner.getMergeProcessorID(partitionID, peerNum) < peerNum);
+    }
   }
 
   public static class PartionedBSP extends