You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by an...@apache.org on 2014/10/08 22:03:35 UTC
svn commit: r1630222 - in /hama/trunk: CHANGES.txt
graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java
graph/src/main/java/org/apache/hama/graph/VertexMessages.java
Author: andronat
Date: Wed Oct 8 20:03:34 2014
New Revision: 1630222
URL: http://svn.apache.org/r1630222
Log:
HAMA-921
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1630222&r1=1630221&r2=1630222&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Oct 8 20:03:34 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
HAMA-885: Semi-Clustering is not producing expected output (Renil J via edwardyoon)
IMPROVEMENTS
+ HAMA-921: Polish doSuperstep() function and VertexMessageIterable class (Anastasis Andronidis)
HAMA-913: Add RPC implementation using netty(bsmin)
HAMA-914: Boolean flag (isCompressed) is required only when runtime compression is enabled (edwardyoon)
HAMA-910: Web UI Improvement (Victor Lee via edwardyoon)
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1630222&r1=1630221&r2=1630222&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Oct 8 20:03:34 2014
@@ -20,7 +20,6 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.Collections;
import java.util.Map.Entry;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -154,6 +153,8 @@ public final class GraphJobRunner<V exte
/**
* Just write <ID as Writable, Value as Writable> pair as a result. Note that
* this will also be executed when failure happened.
+ * @param peer
+ * @throws java.io.IOException
*/
@Override
public final void cleanup(
@@ -203,7 +204,7 @@ public final class GraphJobRunner<V exte
if (firstVertexMessage != null) {
peer.send(peer.getPeerName(), firstVertexMessage);
}
- GraphJobMessage msg = null;
+ GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
peer.send(peer.getPeerName(), msg);
}
@@ -221,104 +222,68 @@ public final class GraphJobRunner<V exte
/**
* Do the main logic of a superstep, namely checking if vertices are active,
* feeding compute with messages and controlling combiners/aggregators.
+ * We iterate over our messages and vertices in sorted order. That means
+ * that we need to seek the first vertex that has the same ID as the
+ * iterated message.
*/
- @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
this.changedVertexCnt = 0;
- vertices.startSuperstep();
+ this.vertices.startSuperstep();
- /*
- * We iterate over our messages and vertices in sorted order. That means
- * that we need to seek the first vertex that has the same ID as the
- * currentMessage or the first vertex that is active.
- */
- IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
- VertexMessageIterable<V, M> iterable = null;
- Vertex<V, E, M> vertex = null;
+ IDSkippingIterator<V, E, M> iterator = this.vertices.skippingIterator();
+ VertexMessages<V, M> queueMessages = new VertexMessages<V, M>(peer);
+ queueMessages.prependMessage(currentMessage);
// note that can't skip inactive vertices because we have to rewrite the
// complete vertex file in each iteration
- while (iterator.hasNext(
- currentMessage == null ? null : (V) currentMessage.getVertexId(),
- Strategy.ALL)) {
-
- vertex = iterator.next();
- if (currentMessage != null) {
- iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
- vertex, peer);
- } else {
- iterable = null;
- }
+ V firstVID = currentMessage == null ? null : (V) currentMessage.getVertexId();
+ while (iterator.hasNext(firstVID, Strategy.ALL)) {
+ Vertex<V, E, M> vertex = iterator.next();
+ boolean msgsExist = queueMessages.continueWith(vertex.getVertexID());
- if (iterable != null && vertex.isHalted()) {
+ if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages);
+
+ if (msgsExist && vertex.isHalted()) {
vertex.setActive();
}
if (!vertex.isHalted()) {
- if (iterable == null) {
- vertex.compute(Collections.<M> emptyList());
- } else {
- vertex.compute(iterable);
- currentMessage = iterable.getOverflowMessage();
- }
+ vertex.compute(queueMessages);
activeVertices++;
}
+ // Dump remaining messages
+ queueMessages.dumpRest();
+
// note that we even need to rewrite the vertex if it is halted for
// consistency reasons
- vertices.finishVertexComputation(vertex);
+ this.vertices.finishVertexComputation(vertex);
}
- vertices.finishSuperstep();
+ this.vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, activeVertices,
this.changedVertexCnt);
- iteration++;
+ this.iteration++;
}
/**
- * Iterating utility that ensures following things: <br/>
- * - if vertex is active, but the given message does not match the vertexID,
- * return null. <br/>
- * - if vertex is inactive, but received a message that matches the ID, build
- * an iterator that can be iterated until the next vertex has been reached
- * (not buffer in memory) and set the vertex active <br/>
- * - if vertex is active, and the given message does match the vertexID,
- * return an iterator that can be iterated until the next vertex has been
- * reached. <br/>
- * - if vertex is inactive, and received no message, return null.
+ * Utility that ensures that the incoming messages have a target vertex.
*/
- @SuppressWarnings("unchecked")
- private VertexMessageIterable<V, M> iterate(GraphJobMessage currentMessage,
- V firstMessageId, Vertex<V, E, M> vertex,
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
- int comparision = firstMessageId.compareTo(vertex.getVertexID());
- if (conf.getBoolean("hama.check.missing.vertex", true)) {
- if (comparision < 0) {
+ private void checkMsgOrder(V vid, VertexMessages<V, M> vm) {
+ // When the vid is greater than the current message, it means that a vertex
+ // has sent a message to an other vertex that doesn't exist
+ if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) {
+ if (conf.getBoolean("hama.check.missing.vertex", true)) {
throw new IllegalArgumentException(
- "A message has recieved with a destination ID: " + firstMessageId +
- " that does not exist! (Vertex iterator is at" + vertex.getVertexID()
- + " ID)");
- }
- } else {
- while (comparision < 0) {
- VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(
- currentMessage, firstMessageId, peer);
- currentMessage = messageIterable.getOverflowMessage();
- firstMessageId = (V) currentMessage.getVertexId();
- comparision = firstMessageId.compareTo(vertex.getVertexID());
- }
- }
- if (comparision == 0) {
- // vertex id matches with the vertex, return an iterator with newest
- // message
- return new VertexMessageIterable<V, M>(currentMessage,
- vertex.getVertexID(), peer);
- } else {
- // return null
- return null;
+ "A message has recieved with a destination ID: " + vm.getMessageVID()
+ + " that does not exist! (Vertex iterator is at" + vid + " ID)");
+ } else {
+ // Skip all unrecognized messages until we find a match
+ vm.continueUntil(vid);
+ }
}
}
@@ -431,9 +396,9 @@ public final class GraphJobRunner<V exte
}
} else {
if (vertex.compareTo(currentVertex) > 0) {
- throw new IOException("The records of split aren't in order by vertex ID.");
+ throw new IOException("The records of split aren't in order by vertex ID.");
}
-
+
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java?rev=1630222&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java Wed Oct 8 20:03:34 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * This class has as a target to iterate the whole sorted queue of the incoming
+ * messages. Each vertex will be able to call the <code>hasNext()</code> and
+ * <code>next()</code> methods to consume the messages. The iterator is
+ * responsible to understand when the messages of a specific Vertex ID have been
+ * consumed, and then unlock the messages of the next Vertex ID through the
+ * <code>continueWith()</code> method.
+ *
+ * @param <V>
+ * @param <T>
+ */
+public class VertexMessages<V, T> implements Iterator<T>, Iterable<T> {
+ private final BSPPeer<?, ?, ?, ?, GraphJobMessage> peer;
+ private V vid = null;
+ private GraphJobMessage currentMessage = null;
+ private boolean locked = true;
+
+ public VertexMessages(BSPPeer<?, ?, ?, ?, GraphJobMessage> peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (locked) {
+ return false;
+ }
+
+ try {
+ if (this.currentMessage == null) {
+ this.currentMessage = this.peer.getCurrentMessage();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (this.currentMessage != null && this.currentMessage.getVertexId().equals(this.vid)) {
+ return true;
+ }
+ // When a new ID has shown up or the messages are finished,
+ // we lock the iterator
+ this.locked = true;
+ return false;
+ }
+
+ @Override
+ public T next() {
+ if (this.currentMessage == null || this.locked) {
+ return null;
+ }
+
+ // Despose the current message and prepare for next hasNext() call
+ try {
+ return (T) this.currentMessage.getVertexValue();
+ } finally {
+ this.currentMessage = null;
+ }
+ }
+
+ /**
+ * By implementing both <code>Iterator</code> and <code>Iterable</code>
+ * interfaces, this class will not be able to re-iterated and the messages
+ * will be accessed only once. In our case this is fine.
+ *
+ * @return an one-time iterator
+ */
+ @Override
+ public Iterator<T> iterator() {
+ return this;
+ }
+
+ /**
+ * This method should be used only after initialization. If an other message
+ * exists in the memory of the iterator, the new prepended message
+ * will be ignored.
+ *
+ * @param msg The message to be prepended just after initialization
+ */
+ public void prependMessage(GraphJobMessage msg) {
+ if (this.currentMessage == null && msg != null) {
+ this.currentMessage = msg;
+ }
+ }
+
+ /**
+ * Check the vertexID target of the current message that is loaded in the
+ * iterator and unlock the iterator only if the <code>vid</code> argument is
+ * matching.
+ *
+ * @param vid
+ * @return return true if the <code>vid</code> is equal to the next message's ID
+ */
+ public boolean continueWith(V vid) {
+ // Normally when we call this method this.locked == true
+ this.vid = vid;
+ this.locked = false;
+
+ if (this.currentMessage == null) {
+ // Get next message (if there is) and decide based on the new vid
+ return this.hasNext();
+ }
+
+ // If we have a message already loaded
+ if (!this.currentMessage.getVertexId().equals(vid)) {
+ this.locked = true;
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Consume the incoming messages until we find a message that has a target
+ * equal to the <code>vid</code> argument.
+ *
+ * @param vid
+ * @return
+ */
+ public boolean continueUntil(V vid) {
+ do {
+ try {
+ if (this.currentMessage == null) {
+ this.currentMessage = this.peer.getCurrentMessage();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (this.currentMessage == null) {
+ this.locked = true;
+ return false;
+ }
+ } while (!this.currentMessage.getVertexId().equals(this.vid));
+
+ this.locked = false;
+ return true;
+ }
+
+ public void dumpRest() {
+ while(this.hasNext()) {
+ this.next();
+ }
+ }
+
+ /**
+ * Return the target Vertex ID of the current loaded message.
+ *
+ * @return
+ */
+ public V getMessageVID() {
+ return this.currentMessage == null ? null : (V) this.currentMessage.getVertexId();
+ }
+}