You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/02/05 08:11:03 UTC
svn commit: r1442482 - in /hama/trunk:
core/src/main/java/org/apache/hama/bsp/
examples/src/test/java/org/apache/hama/examples/
graph/src/test/java/org/apache/hama/graph/
Author: edwardyoon
Date: Tue Feb 5 07:11:03 2013
New Revision: 1442482
URL: http://svn.apache.org/viewvc?rev=1442482&view=rev
Log:
HAMA-730: Concurrent file access in PartitioningRunner
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Feb 5 07:11:03 2013
@@ -159,7 +159,7 @@ public class PartitioningRunner extends
}
values.get(index).put(outputPair.getKey(), outputPair.getValue());
}
-
+
// The reason of use of Memory is to reduce file opens
for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
@@ -174,11 +174,13 @@ public class PartitioningRunner extends
}
peer.sync();
-
+ FileStatus[] status = fs.listStatus(partitionDir);
+ // Call sync() one more time to avoid concurrent access
+ peer.sync();
+
// merge files into one.
// TODO if we use header info, we might able to merge files without full
// scan.
- FileStatus[] status = fs.listStatus(partitionDir);
for (int j = 0; j < status.length; j++) {
int partitionID = Integer.parseInt(status[j].getPath().getName()
.split("[-]")[1]);
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Tue Feb 5 07:11:03 2013
@@ -146,6 +146,8 @@ public class BipartiteMatchingTest exten
fs.delete(new Path(INPUT), true);
if (fs.exists(new Path(OUTPUT)))
fs.delete(new Path(OUTPUT), true);
+ if (fs.exists(new Path("/tmp/partitions")))
+ fs.delete(new Path("/tmp/partitions"), true);
} catch (IOException e) {
e.printStackTrace();
}
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Tue Feb 5 07:11:03 2013
@@ -34,7 +34,7 @@ import org.apache.hama.graph.GraphJobRun
public class PageRankTest extends TestCase {
- private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
+ private static String INPUT = "/tmp/pagerank/";
private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
@@ -86,7 +86,7 @@ public class PageRankTest extends TestCa
private void generateTestData() throws ClassNotFoundException,
InterruptedException, IOException {
- SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "2" });
+ SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "20" });
}
private void deleteTempDirs() {
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1442482&r1=1442481&r2=1442482&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Tue Feb 5 07:11:03 2013
@@ -159,11 +159,11 @@ public class TestSubmitGraphJob extends
e.printStackTrace();
}
}
-
+
private void deleteTempDirs() {
try {
- if (fs.exists(new Path(INPUT)))
- fs.delete(new Path(INPUT), true);
+ if (fs.exists(new Path(INPUT).getParent()))
+ fs.delete(new Path(INPUT).getParent(), true);
if (fs.exists(new Path(OUTPUT)))
fs.delete(new Path(OUTPUT), true);
} catch (IOException e) {