You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/01/19 01:29:45 UTC

svn commit: r1435426 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/test/java/org/apache/hama/examples/

Author: edwardyoon
Date: Sat Jan 19 00:29:45 2013
New Revision: 1435426

URL: http://svn.apache.org/viewvc?rev=1435426&view=rev
Log:
Splits should be assigned based on PartitionID

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Jan 19 00:29:45 2013
@@ -10,7 +10,8 @@ Release 0.7 (unreleased changes)
    HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)
 
   BUG FIXES
-
+ 
+   HAMA-716: Splits should be assigned based on PartitionID (edwardyoon)
    HAMA-712: PartitioningRunner should works for multiple input files (surajsmenon via edwardyoon)
 
   IMPROVEMENTS

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sat Jan 19 00:29:45 2013
@@ -555,6 +555,17 @@ public class BSPJobClient extends Config
       DataOutputBuffer buffer = new DataOutputBuffer();
       RawSplit rawSplit = new RawSplit();
       for (InputSplit split : splits) {
+
+        // set partitionID to rawSplit
+        if (split.getClass().getName().equals(FileSplit.class.getName())
+            && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
+            && job.get("bsp.partitioning.runner.job") == null) {
+          LOG.debug(((FileSplit) split).getPath().getName());
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+              .toString().split("[-]");
+          rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+        }
+
         rawSplit.setClassName(split.getClass().getName());
         buffer.reset();
         split.write(buffer);
@@ -604,8 +615,12 @@ public class BSPJobClient extends Config
     int len = WritableUtils.readVInt(in);
     RawSplit[] result = new RawSplit[len];
     for (int i = 0; i < len; ++i) {
-      result[i] = new RawSplit();
-      result[i].readFields(in);
+      RawSplit split = new RawSplit();
+      split.readFields(in);
+      if (split.getPartitionID() != Integer.MIN_VALUE)
+        result[split.getPartitionID()] = split;
+      else
+        result[i] = split;
     }
     return result;
   }
@@ -1050,6 +1065,7 @@ public class BSPJobClient extends Config
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
+    private int partitionID = Integer.MIN_VALUE;
     long dataLength;
 
     public void setBytes(byte[] data, int offset, int length) {
@@ -1060,6 +1076,14 @@ public class BSPJobClient extends Config
       splitClass = className;
     }
 
+    public void setPartitionID(int id) {
+      this.partitionID = id;
+    }
+
+    public int getPartitionID() {
+      return partitionID;
+    }
+
     public String getClassName() {
       return splitClass;
     }
@@ -1084,6 +1108,7 @@ public class BSPJobClient extends Config
     public void readFields(DataInput in) throws IOException {
       splitClass = Text.readString(in);
       dataLength = in.readLong();
+      partitionID = in.readInt();
       bytes.readFields(in);
       int len = WritableUtils.readVInt(in);
       locations = new String[len];
@@ -1096,6 +1121,7 @@ public class BSPJobClient extends Config
     public void write(DataOutput out) throws IOException {
       Text.writeString(out, splitClass);
       out.writeLong(dataLength);
+      out.writeInt(partitionID);
       bytes.write(out);
       WritableUtils.writeVInt(out, locations.length);
       for (int i = 0; i < locations.length; i++) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sat Jan 19 00:29:45 2013
@@ -230,7 +230,8 @@ public class LocalBSPRunner implements J
       conf.set(Constants.PEER_HOST, "local");
 
       bsp = (BSP) ReflectionUtils.newInstance(
-          job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration());
+          job.getConfiguration().getClass("bsp.work.class", BSP.class),
+          job.getConfiguration());
 
     }
 
@@ -240,6 +241,7 @@ public class LocalBSPRunner implements J
       String splitname = null;
       BytesWritable realBytes = null;
       if (splits != null) {
+        LOG.debug(id + ", " + splits[id].getPartitionID());
         splitname = splits[id].getClassName();
         realBytes = splits[id].getBytes();
       }

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sat Jan 19 00:29:45 2013
@@ -60,7 +60,7 @@ public class MindistSearchTest extends T
   public void testMindistSearch() throws Exception {
     generateTestData();
     try {
-      MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "1" });
+      MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "3" });
 
       verifyResult();
     } finally {

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1435426&r1=1435425&r2=1435426&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Sat Jan 19 00:29:45 2013
@@ -37,9 +37,17 @@ import org.apache.hama.HamaConfiguration
  * Testcase for {@link ShortestPaths}
  */
 public class SSSPTest extends TestCase {
-  String[] input = new String[] { "1:85\t2:217\t4:173", "0:85\t5:80",
-      "0:217\t6:186\t7:103", "7:183", "0:173\t9:502", "1:80\t8:250", "2:186",
-      "3:183\t9:167\t2:103", "5:250\t9:84", "4:502\t7:167\t8:84" };
+  String[] input = new String[] { 
+      "1:85\t2:217\t4:173", 
+      "0:85\t5:80",
+      "0:217\t6:186\t7:103", 
+      "7:183", 
+      "0:173\t9:502", 
+      "1:80\t8:250", 
+      "2:186",
+      "3:183\t9:167\t2:103", 
+      "5:250\t9:84", 
+      "4:502\t7:167\t8:84" };
 
   private static String INPUT = "/tmp/sssp-tmp.seq";
   private static String TEXT_INPUT = "/tmp/sssp.txt";
@@ -59,7 +67,7 @@ public class SSSPTest extends TestCase {
 
     generateTestData();
     try {
-      SSSP.main(new String[] { "0", INPUT, OUTPUT, "2" });
+      SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
       verifyResult();
     } finally {
       deleteTempDirs();
@@ -86,6 +94,7 @@ public class SSSPTest extends TestCase {
       String line = null;
       while ((line = reader.readLine()) != null) {
         String[] split = line.split("\t");
+        System.out.println(split[1] + " = " + (int) rs.get(split[0]));
         assertEquals(Integer.parseInt(split[1]), (int) rs.get(split[0]));
       }
     }