You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/02/05 08:11:03 UTC

svn commit: r1442482 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ examples/src/test/java/org/apache/hama/examples/ graph/src/test/java/org/apache/hama/graph/

Author: edwardyoon
Date: Tue Feb  5 07:11:03 2013
New Revision: 1442482

URL: http://svn.apache.org/viewvc?rev=1442482&view=rev
Log:
HAMA-730: Concurrent file access in PartitioningRunner

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Feb  5 07:11:03 2013
@@ -159,7 +159,7 @@ public class PartitioningRunner extends
       }
       values.get(index).put(outputPair.getKey(), outputPair.getValue());
     }
-
+    
     // The reason of use of Memory is to reduce file opens
     for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
@@ -174,11 +174,13 @@ public class PartitioningRunner extends
     }
 
     peer.sync();
-
+    FileStatus[] status = fs.listStatus(partitionDir);
+    // Call sync() one more time to avoid concurrent access
+    peer.sync();
+    
     // merge files into one.
     // TODO if we use header info, we might able to merge files without full
     // scan.
-    FileStatus[] status = fs.listStatus(partitionDir);
     for (int j = 0; j < status.length; j++) {
       int partitionID = Integer.parseInt(status[j].getPath().getName()
           .split("[-]")[1]);

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Tue Feb  5 07:11:03 2013
@@ -146,6 +146,8 @@ public class BipartiteMatchingTest exten
         fs.delete(new Path(INPUT), true);
       if (fs.exists(new Path(OUTPUT)))
         fs.delete(new Path(OUTPUT), true);
+      if (fs.exists(new Path("/tmp/partitions")))
+        fs.delete(new Path("/tmp/partitions"), true);
     } catch (IOException e) {
       e.printStackTrace();
     }

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Tue Feb  5 07:11:03 2013
@@ -34,7 +34,7 @@ import org.apache.hama.graph.GraphJobRun
 
 public class PageRankTest extends TestCase {
 
-  private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
+  private static String INPUT = "/tmp/pagerank/";
   private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
   private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
 
@@ -86,7 +86,7 @@ public class PageRankTest extends TestCa
 
   private void generateTestData() throws ClassNotFoundException,
       InterruptedException, IOException {
-    SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "2" });
+    SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "20" });
   }
 
   private void deleteTempDirs() {

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Tue Feb  5 07:11:03 2013
@@ -159,11 +159,11 @@ public class TestSubmitGraphJob extends 
       e.printStackTrace();
     }
   }
-  
+
   private void deleteTempDirs() {
     try {
-      if (fs.exists(new Path(INPUT)))
-        fs.delete(new Path(INPUT), true);
+      if (fs.exists(new Path(INPUT).getParent()))
+        fs.delete(new Path(INPUT).getParent(), true);
       if (fs.exists(new Path(OUTPUT)))
         fs.delete(new Path(OUTPUT), true);
     } catch (IOException e) {