You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/05/12 10:45:27 UTC
svn commit: r1481494 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Author: edwardyoon
Date: Sun May 12 08:45:27 2013
New Revision: 1481494
URL: http://svn.apache.org/r1481494
Log:
HAMA-754: PartitioningRunner should write raw records to partition files (edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun May 12 08:45:27 2013
@@ -14,6 +14,7 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
+ HAMA-754: PartitioningRunner should write raw records to partition files (edwardyoon)
HAMA-707: BSPMessageBundle should be able to encapsulate messages serialized in ByteBuffer (surajsmenon)
HAMA-722: Messaging queue should construct sender and receiver queue (surajsmenon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun May 12 08:45:27 2013
@@ -146,8 +146,6 @@ public class PartitioningRunner extends
Class keyClass = null;
Class valueClass = null;
- Class outputKeyClass = null;
- Class outputValueClass = null;
while ((pair = peer.readNext()) != null) {
if (keyClass == null && valueClass == null) {
keyClass = pair.getKey().getClass();
@@ -160,11 +158,6 @@ public class PartitioningRunner extends
continue;
}
- if (outputKeyClass == null && outputValueClass == null) {
- outputKeyClass = outputPair.getKey().getClass();
- outputValueClass = outputPair.getValue().getClass();
- }
-
int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
desiredNum);
@@ -173,7 +166,7 @@ public class PartitioningRunner extends
map = converter.newMap();
values.put(index, map);
}
- map.put(outputPair.getKey(), outputPair.getValue());
+ map.put(pair.getKey(), pair.getValue());
}
// The reason of use of Memory is to reduce file opens
@@ -181,7 +174,7 @@ public class PartitioningRunner extends
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ peer.getPeerIndex());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, outputKeyClass, outputValueClass, CompressionType.NONE);
+ destFile, keyClass, valueClass, CompressionType.NONE);
for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
writer.append(v.getKey(), v.getValue());
@@ -210,8 +203,7 @@ public class PartitioningRunner extends
FileStatus[] files = fs.listStatus(stat.getPath());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- partitionFile, outputKeyClass, outputValueClass,
- CompressionType.NONE);
+ partitionFile, keyClass, valueClass, CompressionType.NONE);
for (int i = 0; i < files.length; i++) {
LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
@@ -220,10 +212,9 @@ public class PartitioningRunner extends
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
files[i].getPath(), conf);
- Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+ Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(valueClass,
conf);
- Writable value = (Writable) ReflectionUtils.newInstance(
- outputValueClass, conf);
while (reader.next(key, value)) {
writer.append(key, value);
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1481494&r1=1481493&r2=1481494&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun May 12 08:45:27 2013
@@ -26,17 +26,20 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.graph.IDSkippingIterator.Strategy;
+import org.apache.hama.util.KeyValuePair;
import org.apache.hama.util.ReflectionUtils;
/**
@@ -373,19 +376,27 @@ public final class GraphJobRunner<V exte
/**
* Loads vertices into memory of each peer.
*/
+ @SuppressWarnings("unchecked")
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
- LOG.debug("Vertex class: " + vertexClass);
+ RecordConverter converter = org.apache.hadoop.util.ReflectionUtils
+ .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
+ DefaultRecordConverter.class, RecordConverter.class), conf);
// our VertexInputReader ensures incoming vertices are sorted by their ID
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
- vertex.runner = this;
- while (peer.readNext(vertex, NullWritable.get())) {
+ KeyValuePair<Writable, Writable> record = null;
+ KeyValuePair<Writable, Writable> converted = null;
+ while ((record = peer.readNext()) != null) {
+ converted = converter.convertRecord(record, conf);
+ vertex = (Vertex<V, E, M>) converted.getKey();
+ vertex.runner = this;
vertex.setup(conf);
+
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}