You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/13 03:00:07 UTC
svn commit: r1673079 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/test/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/bsp/message...
Author: edwardyoon
Date: Mon Apr 13 01:00:06 2015
New Revision: 1673079
URL: http://svn.apache.org/r1673079
Log:
HAMA-946: Improving graph package
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/pom.xml
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Apr 13 01:00:06 2015
@@ -129,7 +129,7 @@ public interface Constants {
public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
- public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";
+ public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";
// /////////////////////////////////////
// Constants for ZooKeeper
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Apr 13 01:00:06 2015
@@ -45,7 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
@@ -58,6 +58,11 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.OutgoingPOJOMessageBundle;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.hama.ipc.RPC;
@@ -346,6 +351,7 @@ public class BSPJobClient extends Config
InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
+ /*
job = partition(job, splits, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
@@ -358,6 +364,7 @@ public class BSPJobClient extends Config
"Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
+ */
job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -451,13 +458,22 @@ public class BSPJobClient extends Config
+ partitioningJob.getJobName());
LOG.debug("partitioningJob input: "
+ partitioningJob.get(Constants.JOB_INPUT_DIR));
+
+ partitioningJob.getConfiguration().setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+ OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
+ partitioningJob.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ SortedMemoryQueue.class, MessageQueue.class);
+
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
partitioningJob.setInputValueClass(job.getInputValueClass());
- partitioningJob.setOutputFormat(NullOutputFormat.class);
- partitioningJob.setOutputKeyClass(NullWritable.class);
- partitioningJob.setOutputValueClass(NullWritable.class);
+
+ partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
+ partitioningJob.setOutputKeyClass(job.getInputKeyClass());
+ partitioningJob.setOutputValueClass(job.getInputValueClass());
+
partitioningJob.setBspClass(PartitioningRunner.class);
+ partitioningJob.setMessageClass(MapWritable.class);
partitioningJob.set("bsp.partitioning.runner.job", "true");
partitioningJob.getConfiguration().setBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false);
@@ -554,15 +570,15 @@ public class BSPJobClient extends Config
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
+ /*
// set partitionID to rawSplit
- if (split.getClass().getName().equals(FileSplit.class.getName())
- && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
- && job.get("bsp.partitioning.runner.job") == null) {
+ if (split.getClass().getName().equals(FileSplit.class.getName())) {
LOG.debug(((FileSplit) split).getPath().getName());
String[] extractPartitionID = ((FileSplit) split).getPath().getName()
.split("[-]");
rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
}
+ */
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -65,11 +65,29 @@ public class BSPMessageBundle<M extends
}
kryo.writeObject(output, message);
- output.flush();
-
bundleSize++;
}
-
+
+ public void addMessages(Iterator<M> iterator) {
+ M message = iterator.next();
+ if (className == null) {
+ className = message.getClass().getName();
+ kryo.register(message.getClass());
+ }
+
+ kryo.writeObject(output, message);
+ bundleSize++;
+
+ while(iterator.hasNext()) {
+ kryo.writeObject(output, iterator.next());
+ bundleSize++;
+ }
+ }
+
+ public void finishAddition() {
+ output.flush();
+ }
+
public byte[] getBuffer() {
return outputStream.toByteArray();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Apr 13 01:00:06 2015
@@ -352,7 +352,7 @@ public class LocalBSPRunner implements J
throws IOException {
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
-
+
MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
bundle.size());
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Mon Apr 13 01:00:06 2015
@@ -170,6 +170,7 @@ public abstract class AbstractMessageMan
public void send(String peerName, M msg) throws IOException {
outgoingMessageManager.addMessage(peerName, msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+
notifySentMessage(peerName, msg);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -70,6 +70,9 @@ public class OutgoingPOJOMessageBundle<M
@Override
public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator() {
+ for(BSPMessageBundle<M> b : outgoingBundles.values()) {
+ b.finishAddition();
+ }
return outgoingBundles.entrySet().iterator();
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -38,6 +38,7 @@ public class TestBSPMessageBundle extend
// Serialize it.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
bundle.write(new DataOutputStream(baos));
+ bundle.finishAddition();
baos.close();
// Deserialize it.
BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
@@ -64,6 +65,8 @@ public class TestBSPMessageBundle extend
testMessages[i] = msg;
bundle.addMessage(testMessages[i]);
}
+ bundle.finishAddition();
+
// Serialize it.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
bundle.write(new DataOutputStream(baos));
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Apr 13 01:00:06 2015
@@ -653,7 +653,8 @@ public class TestCheckpoint extends Test
BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
assertEquals(5, bundleRead.size());
-
+ bundleRead.finishAddition();
+
String recoveredMsg = bundleRead.iterator().next().toString();
assertEquals(recoveredMsg, "data");
dfs.delete(new Path("checkpoint"), true);
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Mon Apr 13 01:00:06 2015
@@ -91,6 +91,8 @@ public class TestHamaAsyncMessageManager
bundle.addMessage(it.next());
}
+ bundle.finishAddition();
+
messageManager.transfer(peer, bundle);
messageManager.clearOutgoingMessages();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Mon Apr 13 01:00:06 2015
@@ -90,7 +90,8 @@ public class TestHamaMessageManager exte
while (it.hasNext()) {
bundle.addMessage(it.next());
}
-
+ bundle.finishAddition();
+
messageManager.transfer(peer, bundle);
messageManager.clearOutgoingMessages();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Mon Apr 13 01:00:06 2015
@@ -50,6 +50,8 @@ public class TestBSPMessageCompressor ex
a.addMessage(new IntWritable(i));
}
+ a.finishAddition();
+
ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
a.write(bufferDos);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Mon Apr 13 01:00:06 2015
@@ -191,8 +191,12 @@ public final class BipartiteMatching {
job.setEdgeValueClass(NullWritable.class);
job.setInputFormat(TextInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(Text.class);
+
job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
job.setPartitioner(HashPartitioner.class);
+
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TextPair.class);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java Mon Apr 13 01:00:06 2015
@@ -153,7 +153,9 @@ public class DynamicGraph {
graphJob.setEdgeValueClass(NullWritable.class);
graphJob.setInputFormat(TextInputFormat.class);
-
+ graphJob.setInputKeyClass(LongWritable.class);
+ graphJob.setInputValueClass(Text.class);
+
graphJob.setVertexInputReaderClass(GraphTextReader.class);
graphJob.setPartitioner(HashPartitioner.class);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java Mon Apr 13 01:00:06 2015
@@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.ml.kcore.KCoreMessage;
import org.apache.hama.ml.kcore.KCoreVertex;
@@ -50,6 +52,10 @@ public class KCore {
graphJob.setOutputKeyClass(LongWritable.class);
graphJob.setOutputValueClass(IntWritable.class);
+ graphJob.setInputFormat(TextInputFormat.class);
+ graphJob.setInputKeyClass(LongWritable.class);
+ graphJob.setInputValueClass(Text.class);
+
return graphJob;
}
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Mon Apr 13 01:00:06 2015
@@ -135,6 +135,8 @@ public class PageRank {
pageJob.setEdgeValueClass(NullWritable.class);
pageJob.setInputFormat(SequenceFileInputFormat.class);
+ pageJob.setInputKeyClass(Text.class);
+ pageJob.setInputValueClass(TextArrayWritable.class);
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(TextOutputFormat.class);
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Mon Apr 13 01:00:06 2015
@@ -108,7 +108,7 @@ public class BipartiteMatchingTest exten
private void verifyResult() throws IOException {
FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
- assertTrue("Not enough files found: " + files.length, files.length == 2);
+ assertTrue("Not enough files found: " + files.length, files.length == 1);
for (FileStatus file : files) {
if (file.getLen() > 0) {
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Mon Apr 13 01:00:06 2015
@@ -79,7 +79,7 @@ public class PageRankTest extends TestCa
private void generateTestData() {
try {
- FastGraphGen.main(new String[] { "400", "10", INPUT, "3" });
+ FastGraphGen.main(new String[] { "60", "3", INPUT, "3" });
} catch (Exception e) {
e.printStackTrace();
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Mon Apr 13 01:00:06 2015
@@ -46,6 +46,7 @@ public final class GraphJobMessage imple
public static final int MAP_FLAG = 0x01;
public static final int VERTEX_FLAG = 0x02;
public static final int VERTICES_SIZE_FLAG = 0x04;
+ public static final int PARTITION_FLAG = 0x08;
// default flag to -1 "unknown"
private int flag = -1;
@@ -54,7 +55,8 @@ public final class GraphJobMessage imple
private WritableComparable vertexId;
private IntWritable integerMessage;
private static GraphJobMessageComparator comparator;
-
+ private Vertex<?, ?, ?> vertex;
+
private int numOfValues = 0;
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
@@ -106,6 +108,10 @@ public final class GraphJobMessage imple
return map;
}
+ public Vertex<?, ?, ?> getVertex() {
+ return vertex;
+ }
+
public WritableComparable<?> getVertexId() {
return vertexId;
}
@@ -132,13 +138,24 @@ public final class GraphJobMessage imple
byteBuffer.write(a.toByteArray());
numOfValues++;
} catch (IOException e) {
+ // TODO Auto-generated catch block
e.printStackTrace();
}
}
public void addAll(List<Writable> values) {
- for (Writable v : values)
- add(v);
+ ByteArrayOutputStream a = new ByteArrayOutputStream();
+ DataOutputStream b = new DataOutputStream(a);
+ try {
+ for (Writable v : values) {
+ v.write(b);
+ }
+
+ byteBuffer.write(a.toByteArray());
+ numOfValues += values.size();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
public int getNumOfValues() {
@@ -150,6 +167,11 @@ public final class GraphJobMessage imple
this.integerMessage = size;
}
+ public GraphJobMessage(Vertex<?, ?, ?> vertex) {
+ this.flag = PARTITION_FLAG;
+ this.vertex = vertex;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(this.flag);
@@ -165,6 +187,8 @@ public final class GraphJobMessage imple
map.write(out);
} else if (isVerticesSizeMessage()) {
integerMessage.write(out);
+ } else if (isPartitioningMessage()) {
+ vertex.write(out);
} else {
vertexId.write(out);
}
@@ -210,6 +234,9 @@ public final class GraphJobMessage imple
} else if (isVerticesSizeMessage()) {
integerMessage = new IntWritable();
integerMessage.readFields(in);
+ } else if (isPartitioningMessage()) {
+ vertex = (Vertex<?, ?, ?>) ReflectionUtils.newInstance(GraphJobRunner.VERTEX_CLASS, null);
+ vertex.readFields(in);
} else {
vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
null);
@@ -255,6 +282,10 @@ public final class GraphJobMessage imple
return flag == VERTICES_SIZE_FLAG;
}
+ public boolean isPartitioningMessage() {
+ return flag == PARTITION_FLAG;
+ }
+
@Override
public String toString() {
if (isVertexMessage()) {
@@ -265,7 +296,7 @@ public final class GraphJobMessage imple
return "#Vertices: " + integerMessage;
} else {
return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
- + vertexId + ", vertexValue=" + numOfValues + "]";
+ + vertexId + ", vertexValue=" + numOfValues + ", " + vertex.toString() + "]";
}
}
@@ -308,14 +339,14 @@ public final class GraphJobMessage imple
}
}
-
public Iterable<Writable> getIterableMessages() {
return new Iterable<Writable>() {
@Override
public Iterator<Writable> iterator() {
return new Iterator<Writable>() {
- ByteArrayInputStream bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ byteBuffer.toByteArray());
DataInputStream dis = new DataInputStream(bis);
int index = 0;
@@ -344,5 +375,4 @@ public final class GraphJobMessage imple
};
}
-
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 13 01:00:06 2015
@@ -18,9 +18,11 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
@@ -39,12 +41,12 @@ import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
-import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
-import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.util.ReflectionUtils;
+import com.google.common.collect.Lists;
+
/**
* Fully generic graph job runner.
*
@@ -100,7 +102,7 @@ public final class GraphJobRunner<V exte
private long numberVertices = 0;
// -1 is deactivated
private int maxIteration = -1;
- private long iteration;
+ private long iteration = 0;
private AggregationRunner<V, E, M> aggregationRunner;
private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
@@ -114,12 +116,20 @@ public final class GraphJobRunner<V exte
setupFields(peer);
+ long startTime = System.currentTimeMillis();
loadVertices(peer);
+ LOG.info("Total time spent for loading vertices: "
+ + (System.currentTimeMillis() - startTime) + " ms");
+ startTime = System.currentTimeMillis();
countGlobalVertexCount(peer);
+ LOG.info("Total time spent for broadcasting global vertex count: "
+ + (System.currentTimeMillis() - startTime) + " ms");
+ startTime = System.currentTimeMillis();
doInitialSuperstep(peer);
-
+ LOG.info("Total time spent for initial superstep: "
+ + (System.currentTimeMillis() - startTime) + " ms");
}
@Override
@@ -137,15 +147,23 @@ public final class GraphJobRunner<V exte
// note that the messages must be parsed here
GraphJobMessage firstVertexMessage = parseMessages(peer);
+
+ long startTime = System.currentTimeMillis();
// master/slaves needs to update
doAggregationUpdates(peer);
+ LOG.info("Total time spent for broadcasting aggregation values: "
+ + (System.currentTimeMillis() - startTime) + " ms");
+
// check if updated changed by our aggregators
if (!updated) {
break;
}
// loop over vertices and do their computation
+ startTime = System.currentTimeMillis();
doSuperstep(firstVertexMessage, peer);
+ LOG.info("Total time spent for " + peer.getSuperstepCount()
+ + " superstep: " + (System.currentTimeMillis() - startTime) + " ms");
if (isMasterTask(peer)) {
peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -249,7 +267,7 @@ public final class GraphJobRunner<V exte
currentMessage = peer.getCurrentMessage();
}
-
+
for (V v : notComputedVertices) {
vertex = vertices.get(v);
if (!vertex.isHalted()) {
@@ -275,16 +293,23 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
-
- while (iterator.hasNext()) {
- Vertex<V, E, M> vertex = iterator.next();
-
- // Calls setup method.
- vertex.setup(conf);
-
- vertex.compute(Collections.singleton(vertex.getValue()));
- vertices.finishVertexComputation(vertex);
+ List<Thread> runners = new ArrayList<Thread>();
+ List<Vertex<V, E, M>> v = new ArrayList<Vertex<V, E, M>>(
+ vertices.getValues());
+ for (List<Vertex<V, E, M>> partition : Lists.partition(v, conf.getInt("hama.graph.thread.num", 30))) {
+ runners.add(new Computer(partition));
+ }
+
+ for (Thread computer : runners) {
+ computer.start();
+ }
+
+ for (Thread computer : runners) {
+ try {
+ computer.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
vertices.finishSuperstep();
@@ -292,6 +317,27 @@ public final class GraphJobRunner<V exte
iteration++;
}
+ class Computer extends Thread {
+ List<Vertex<V, E, M>> partition;
+
+ public Computer(List<Vertex<V, E, M>> partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (Vertex<V, E, M> v : partition) {
+ v.setup(conf);
+ v.compute(Collections.singleton(v.getValue()));
+ vertices.finishVertexComputation(v);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
private void setupFields(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -350,44 +396,33 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- RecordConverter converter = org.apache.hadoop.util.ReflectionUtils
+ final VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
- DefaultRecordConverter.class, RecordConverter.class), conf);
-
- // our VertexInputReader ensures incoming vertices are sorted by their ID
- Vertex<V, E, M> vertex = GraphJobRunner
- .<V, E, M> newVertexInstance(VERTEX_CLASS);
- Vertex<V, E, M> currentVertex = GraphJobRunner
- .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
- KeyValuePair<Writable, Writable> record = null;
- KeyValuePair<Writable, Writable> converted = null;
-
- while ((record = peer.readNext()) != null) {
- converted = converter.convertRecord(record, conf);
- currentVertex = (Vertex<V, E, M>) converted.getValue();
-
- if (vertex.getVertexID() == null) {
- vertex = currentVertex;
- } else {
- if (vertex.getVertexID().equals(currentVertex.getVertexID())) {
- for (Edge<V, E> edge : currentVertex.getEdges()) {
- vertex.addEdge(edge);
- }
- } else {
- if (vertex.compareTo(currentVertex) > 0) {
- throw new IOException(
- "The records of split aren't in order by vertex ID.");
- }
+ VertexInputReader.class));
- addVertex(vertex);
- vertex = currentVertex;
+ try {
+ KeyValuePair<Writable, Writable> next = null;
+ while ((next = peer.readNext()) != null) {
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+ boolean vertexFinished = reader.parseVertex(next.getKey(),
+ next.getValue(), vertex);
+ if (!vertexFinished) {
+ continue;
}
+ peer.send(getHostName(vertex.getVertexID()),
+ new GraphJobMessage(vertex));
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
- // add last vertex.
- addVertex(vertex);
+ peer.sync();
+ GraphJobMessage received;
+ while ((received = peer.getCurrentMessage()) != null) {
+ addVertex((Vertex<V, E, M>) received.getVertex());
+ }
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
LOG.debug("Starting Vertex processing!");
@@ -403,6 +438,7 @@ public final class GraphJobRunner<V exte
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
+ vertex.setRunner(this);
vertices.put(vertex);
LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
@@ -535,6 +571,20 @@ public final class GraphJobRunner<V exte
vertices.finishSuperstep();
}
+ public void sendMessage(V dstinationVertexID, M msg) throws IOException {
+ peer.send(getHostName(dstinationVertexID), new GraphJobMessage(
+ dstinationVertexID, msg));
+ }
+
+ /**
+ * @return the destination peer name of the destination of the given directed
+ * edge.
+ */
+ public String getHostName(V vertexID) {
+ return peer.getPeerName(getPartitioner().getPartition(vertexID, null,
+ peer.getNumPeers()));
+ }
+
/**
* @return the number of vertices, globally accumulated.
*/
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Mon Apr 13 01:00:06 2015
@@ -73,7 +73,7 @@ public class IncomingVertexMessageManage
public void add(GraphJobMessage item) {
if (item.isVertexMessage()) {
msgPerVertex.add(item.getVertexId(), item);
- } else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
+ } else {
mapMessages.add(item);
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 13 01:00:06 2015
@@ -18,6 +18,7 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -40,19 +41,21 @@ import org.apache.hama.bsp.TaskAttemptID
*/
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
- private GraphJobRunner<V, E, M> runner;
-
private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
- this.runner = runner;
}
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
- vertices.put(vertex.getVertexID(), vertex);
+ if(vertices.containsKey(vertex.getVertexID())) {
+ for(Edge<V, E> e : vertex.getEdges())
+ vertices.get(vertex.getVertexID()).addEdge(e);
+ } else {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
}
@Override
@@ -65,15 +68,18 @@ public final class MapVerticesInfo<V ext
}
@Override
+ public Collection<Vertex<V, E, M>> getValues() {
+ return vertices.values();
+ }
+
+ @Override
public int size() {
return vertices.size();
}
@Override
public Vertex<V, E, M> get(V vertexID) {
- Vertex<V, E, M> vertex = vertices.get(vertexID);
- vertex.setRunner(runner);
- return vertex;
+ return vertices.get(vertexID);
}
@Override
@@ -90,9 +96,7 @@ public final class MapVerticesInfo<V ext
@Override
public Vertex<V, E, M> next() {
- Vertex<V, E, M> vertex = vertexIterator.next();
- vertex.setRunner(runner);
- return vertex;
+ return vertexIterator.next();
}
@Override
@@ -111,7 +115,7 @@ public final class MapVerticesInfo<V ext
@Override
public void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
- vertices.put(vertex.getVertexID(), vertex);
+ // do nothing
}
@Override
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Mon Apr 13 01:00:06 2015
@@ -108,12 +108,9 @@ public class OutgoingVertexMessageManage
MessagePerVertex msgStorage = storage.get(bundle.getKey());
if (msgStorage != null) {
- Iterator<GraphJobMessage> it = msgStorage.iterator();
- while (it.hasNext()) {
- bundle.getValue().addMessage(it.next());
- }
+ bundle.getValue().addMessages(msgStorage.iterator());
}
-
+ bundle.getValue().finishAddition();
storage.remove(bundle.getKey());
return bundle;
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Mon Apr 13 01:00:06 2015
@@ -76,28 +76,14 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
- runner.getPeer().send(getDestinationPeerName(e),
- new GraphJobMessage(e.getDestinationVertexID(), msg));
+ runner.sendMessage(e.getDestinationVertexID(), msg);
}
- /**
- * @return the destination peer name of the destination of the given directed
- * edge.
- */
- public String getDestinationPeerName(Edge<V, E> edge) {
- return getDestinationPeerName(edge.getDestinationVertexID());
- }
-
- /**
- * @return the destination peer name of the given vertex id, determined by the
- * partitioner.
- */
- public String getDestinationPeerName(V vertexId) {
- return runner.getPeer().getPeerName(
- getPartitioner().getPartition(vertexId, value,
- runner.getPeer().getNumPeers()));
+ @Override
+ public void sendMessage(V destinationVertexID, M msg) throws IOException {
+ runner.sendMessage(destinationVertexID, msg);
}
-
+
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
@@ -106,15 +92,6 @@ public abstract class Vertex<V extends W
}
}
- @Override
- public void sendMessage(V destinationVertexID, M msg) throws IOException {
- int partition = getPartitioner().getPartition(destinationVertexID, msg,
- runner.getPeer().getNumPeers());
- String destPeer = runner.getPeer().getAllPeerNames()[partition];
- runner.getPeer().send(destPeer,
- new GraphJobMessage(destinationVertexID, msg));
- }
-
private void alterVertexCounter(int i) throws IOException {
this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon Apr 13 01:00:06 2015
@@ -18,6 +18,7 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
@@ -79,6 +80,8 @@ public interface VerticesInfo<V extends
*/
public Set<V> keySet();
+ public Collection<Vertex<V, E, M>> getValues();
+
/**
* Finish the additions, from this point on the implementations should close
* the adds and throw exceptions in case something is added after this call.
@@ -101,4 +104,5 @@ public interface VerticesInfo<V extends
*/
public void finishSuperstep() throws IOException;
+
}
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Apr 13 01:00:06 2015
@@ -62,7 +62,6 @@ public class TestSubmitGraphJob extends
public void setUp() throws Exception {
super.setUp();
vi.add(MapVerticesInfo.class);
- vi.add(OffHeapVerticesInfo.class);
}
@Override
@@ -119,7 +118,7 @@ public class TestSubmitGraphJob extends
@SuppressWarnings("rawtypes")
protected void injectVerticesInfo() {
Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
- .abs(new Random().nextInt() % 2));
+ .abs(new Random().nextInt() % 1));
LOG.info("using vertices info of type : " + verticesInfoClass.getName());
}
Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Apr 13 01:00:06 2015
@@ -126,24 +126,6 @@
<profiles>
<profile>
- <id>doclint-java8-disable</id>
- <activation>
- <jdk>[1.8,)</jdk>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <additionalparam>-Xdoclint:none</additionalparam>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
<id>hadoop1</id>
<modules>
@@ -493,6 +475,7 @@
<version>2.9.1</version>
<configuration>
<aggregate>true</aggregate>
+ <failOnError>false</failOnError>
<outputDirectory>docs/apidocs</outputDirectory>
</configuration>
<executions>