You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/05/12 06:02:24 UTC
svn commit: r1481469 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
Author: edwardyoon
Date: Sun May 12 04:02:23 2013
New Revision: 1481469
URL: http://svn.apache.org/r1481469
Log:
HAMA-756: Timing issue and file merging algorithm in PartitioningRunner make job fail
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun May 12 04:02:23 2013
@@ -8,6 +8,7 @@ Release 0.7 (unreleased changes)
BUG FIXES
+ HAMA-756: Timing issue and file merging algorithm in PartitioningRunner make job fail (MaoYuan Xian via edwardyoon)
HAMA-755: registerJob of ZKSyncBSPMasterClient makes a useless node in zookeeper (MaoYuan Xian via edwardyoon)
HAMA-750: Fix bug of comma separated input paths (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun May 12 04:02:23 2013
@@ -191,31 +191,24 @@ public class PartitioningRunner extends
peer.sync();
FileStatus[] status = fs.listStatus(partitionDir);
+ // To avoid race condition, we should store the peer number
+ int peerNum = peer.getNumPeers();
// Call sync() one more time to avoid concurrent access
peer.sync();
// merge files into one.
// TODO if we use header info, we might able to merge files without full
// scan.
- for (FileStatus statu : status) {
+ for (FileStatus stat : status) {
int partitionID = Integer
- .parseInt(statu.getPath().getName().split("[-]")[1]);
- int denom = desiredNum / peer.getNumPeers();
- int assignedID = partitionID;
- if (denom > 1) {
- assignedID = partitionID / denom;
- }
-
- if (assignedID == peer.getNumPeers())
- assignedID = assignedID - 1;
+ .parseInt(stat.getPath().getName().split("[-]")[1]);
// TODO set replica factor to 1.
- // TODO and check whether we can write to specific DataNode.
- if (assignedID == peer.getPeerIndex()) {
+ if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
Path partitionFile = new Path(partitionDir + "/"
+ getPartitionName(partitionID));
- FileStatus[] files = fs.listStatus(statu.getPath());
+ FileStatus[] files = fs.listStatus(stat.getPath());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
partitionFile, outputKeyClass, outputValueClass,
CompressionType.NONE);
@@ -239,11 +232,15 @@ public class PartitioningRunner extends
}
writer.close();
- fs.delete(statu.getPath(), true);
+ fs.delete(stat.getPath(), true);
}
}
}
+ public static int getMergeProcessorID(int partitionID, int peerNum) {
+ return partitionID % peerNum;
+ }
+
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
return ReflectionUtils.newInstance(conf.getClass(
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1481469&r1=1481468&r2=1481469&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Sun May 12 04:02:23 2013
@@ -90,6 +90,15 @@ public class TestPartitioning extends Ha
FileSystem fs = FileSystem.get(conf);
fs.delete(OUTPUT_PATH, true);
+
+ getMergeProcessorID();
+ }
+
+ public void getMergeProcessorID() {
+ int peerNum = 6;
+ for (int partitionID = 0; partitionID < 8; partitionID++) {
+ assertTrue(PartitioningRunner.getMergeProcessorID(partitionID, peerNum) < peerNum);
+ }
}
public static class PartionedBSP extends