You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/01/19 01:29:45 UTC
svn commit: r1435426 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
examples/src/test/java/org/apache/hama/examples/
Author: edwardyoon
Date: Sat Jan 19 00:29:45 2013
New Revision: 1435426
URL: http://svn.apache.org/viewvc?rev=1435426&view=rev
Log:
Splits should be assigned based on PartitionID
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Jan 19 00:29:45 2013
@@ -10,7 +10,8 @@ Release 0.7 (unreleased changes)
HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)
BUG FIXES
-
+
+ HAMA-716: Splits should be assigned based on PartitionID (edwardyoon)
HAMA-712: PartitioningRunner should works for multiple input files (surajsmenon via edwardyoon)
IMPROVEMENTS
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sat Jan 19 00:29:45 2013
@@ -555,6 +555,17 @@ public class BSPJobClient extends Config
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
+
+ // set partitionID to rawSplit
+ if (split.getClass().getName().equals(FileSplit.class.getName())
+ && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
+ && job.get("bsp.partitioning.runner.job") == null) {
+ LOG.debug(((FileSplit) split).getPath().getName());
+ String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+ .toString().split("[-]");
+ rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+ }
+
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
@@ -604,8 +615,12 @@ public class BSPJobClient extends Config
int len = WritableUtils.readVInt(in);
RawSplit[] result = new RawSplit[len];
for (int i = 0; i < len; ++i) {
- result[i] = new RawSplit();
- result[i].readFields(in);
+ RawSplit split = new RawSplit();
+ split.readFields(in);
+ if (split.getPartitionID() != Integer.MIN_VALUE)
+ result[split.getPartitionID()] = split;
+ else
+ result[i] = split;
}
return result;
}
@@ -1050,6 +1065,7 @@ public class BSPJobClient extends Config
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
+ private int partitionID = Integer.MIN_VALUE;
long dataLength;
public void setBytes(byte[] data, int offset, int length) {
@@ -1060,6 +1076,14 @@ public class BSPJobClient extends Config
splitClass = className;
}
+ public void setPartitionID(int id) {
+ this.partitionID = id;
+ }
+
+ public int getPartitionID() {
+ return partitionID;
+ }
+
public String getClassName() {
return splitClass;
}
@@ -1084,6 +1108,7 @@ public class BSPJobClient extends Config
public void readFields(DataInput in) throws IOException {
splitClass = Text.readString(in);
dataLength = in.readLong();
+ partitionID = in.readInt();
bytes.readFields(in);
int len = WritableUtils.readVInt(in);
locations = new String[len];
@@ -1096,6 +1121,7 @@ public class BSPJobClient extends Config
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitClass);
out.writeLong(dataLength);
+ out.writeInt(partitionID);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
for (int i = 0; i < locations.length; i++) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sat Jan 19 00:29:45 2013
@@ -230,7 +230,8 @@ public class LocalBSPRunner implements J
conf.set(Constants.PEER_HOST, "local");
bsp = (BSP) ReflectionUtils.newInstance(
- job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration());
+ job.getConfiguration().getClass("bsp.work.class", BSP.class),
+ job.getConfiguration());
}
@@ -240,6 +241,7 @@ public class LocalBSPRunner implements J
String splitname = null;
BytesWritable realBytes = null;
if (splits != null) {
+ LOG.debug(id + ", " + splits[id].getPartitionID());
splitname = splits[id].getClassName();
realBytes = splits[id].getBytes();
}
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sat Jan 19 00:29:45 2013
@@ -60,7 +60,7 @@ public class MindistSearchTest extends T
public void testMindistSearch() throws Exception {
generateTestData();
try {
- MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "1" });
+ MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "3" });
verifyResult();
} finally {
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Sat Jan 19 00:29:45 2013
@@ -37,9 +37,17 @@ import org.apache.hama.HamaConfiguration
* Testcase for {@link ShortestPaths}
*/
public class SSSPTest extends TestCase {
- String[] input = new String[] { "1:85\t2:217\t4:173", "0:85\t5:80",
- "0:217\t6:186\t7:103", "7:183", "0:173\t9:502", "1:80\t8:250", "2:186",
- "3:183\t9:167\t2:103", "5:250\t9:84", "4:502\t7:167\t8:84" };
+ String[] input = new String[] {
+ "1:85\t2:217\t4:173",
+ "0:85\t5:80",
+ "0:217\t6:186\t7:103",
+ "7:183",
+ "0:173\t9:502",
+ "1:80\t8:250",
+ "2:186",
+ "3:183\t9:167\t2:103",
+ "5:250\t9:84",
+ "4:502\t7:167\t8:84" };
private static String INPUT = "/tmp/sssp-tmp.seq";
private static String TEXT_INPUT = "/tmp/sssp.txt";
@@ -59,7 +67,7 @@ public class SSSPTest extends TestCase {
generateTestData();
try {
- SSSP.main(new String[] { "0", INPUT, OUTPUT, "2" });
+ SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
verifyResult();
} finally {
deleteTempDirs();
@@ -86,6 +94,7 @@ public class SSSPTest extends TestCase {
String line = null;
while ((line = reader.readLine()) != null) {
String[] split = line.split("\t");
+ System.out.println(split[1] + " = " + (int) rs.get(split[0]));
assertEquals(Integer.parseInt(split[1]), (int) rs.get(split[0]));
}
}