You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/05/12 10:45:27 UTC

svn commit: r1481494 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: edwardyoon
Date: Sun May 12 08:45:27 2013
New Revision: 1481494

URL: http://svn.apache.org/r1481494
Log:
HAMA-754: PartitioningRunner should write raw records to partition files (edwardyoon)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun May 12 08:45:27 2013
@@ -14,6 +14,7 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
    
+   HAMA-754: PartitioningRunner should write raw records to partition files (edwardyoon)
    HAMA-707: BSPMessageBundle should be able to encapsulate messages serialized in ByteBuffer (surajsmenon) 
    HAMA-722: Messaging queue should construct sender and receiver queue (surajsmenon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun May 12 08:45:27 2013
@@ -146,8 +146,6 @@ public class PartitioningRunner extends
 
     Class keyClass = null;
     Class valueClass = null;
-    Class outputKeyClass = null;
-    Class outputValueClass = null;
     while ((pair = peer.readNext()) != null) {
       if (keyClass == null && valueClass == null) {
         keyClass = pair.getKey().getClass();
@@ -160,11 +158,6 @@ public class PartitioningRunner extends
         continue;
       }
 
-      if (outputKeyClass == null && outputValueClass == null) {
-        outputKeyClass = outputPair.getKey().getClass();
-        outputValueClass = outputPair.getValue().getClass();
-      }
-
       int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
           desiredNum);
 
@@ -173,7 +166,7 @@ public class PartitioningRunner extends
         map = converter.newMap();
         values.put(index, map);
       }
-      map.put(outputPair.getKey(), outputPair.getValue());
+      map.put(pair.getKey(), pair.getValue());
     }
 
     // The reason of use of Memory is to reduce file opens
@@ -181,7 +174,7 @@ public class PartitioningRunner extends
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
           + peer.getPeerIndex());
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-          destFile, outputKeyClass, outputValueClass, CompressionType.NONE);
+          destFile, keyClass, valueClass, CompressionType.NONE);
 
       for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
         writer.append(v.getKey(), v.getValue());
@@ -210,8 +203,7 @@ public class PartitioningRunner extends
 
         FileStatus[] files = fs.listStatus(stat.getPath());
         SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            partitionFile, outputKeyClass, outputValueClass,
-            CompressionType.NONE);
+            partitionFile, keyClass, valueClass, CompressionType.NONE);
 
         for (int i = 0; i < files.length; i++) {
           LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
@@ -220,10 +212,9 @@ public class PartitioningRunner extends
           SequenceFile.Reader reader = new SequenceFile.Reader(fs,
               files[i].getPath(), conf);
 
-          Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+          Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+          Writable value = (Writable) ReflectionUtils.newInstance(valueClass,
               conf);
-          Writable value = (Writable) ReflectionUtils.newInstance(
-              outputValueClass, conf);
 
           while (reader.next(key, value)) {
             writer.append(key, value);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun May 12 08:45:27 2013
@@ -26,17 +26,20 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.graph.IDSkippingIterator.Strategy;
+import org.apache.hama.util.KeyValuePair;
 import org.apache.hama.util.ReflectionUtils;
 
 /**
@@ -373,19 +376,27 @@ public final class GraphJobRunner<V exte
   /**
    * Loads vertices into memory of each peer.
    */
+  @SuppressWarnings("unchecked")
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
     final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
 
-    LOG.debug("Vertex class: " + vertexClass);
+    RecordConverter converter = org.apache.hadoop.util.ReflectionUtils
+        .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
+            DefaultRecordConverter.class, RecordConverter.class), conf);
 
     // our VertexInputReader ensures incoming vertices are sorted by their ID
     Vertex<V, E, M> vertex = GraphJobRunner
         .<V, E, M> newVertexInstance(VERTEX_CLASS);
-    vertex.runner = this;
-    while (peer.readNext(vertex, NullWritable.get())) {
+    KeyValuePair<Writable, Writable> record = null;
+    KeyValuePair<Writable, Writable> converted = null;
+    while ((record = peer.readNext()) != null) {
+      converted = converter.convertRecord(record, conf);
+      vertex = (Vertex<V, E, M>) converted.getKey();
+      vertex.runner = this;
       vertex.setup(conf);
+
       if (selfReference) {
         vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }