You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by an...@apache.org on 2014/10/08 22:03:35 UTC

svn commit: r1630222 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java graph/src/main/java/org/apache/hama/graph/VertexMessages.java

Author: andronat
Date: Wed Oct  8 20:03:34 2014
New Revision: 1630222

URL: http://svn.apache.org/r1630222
Log:
HAMA-921

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java
Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1630222&r1=1630221&r2=1630222&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Oct  8 20:03:34 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
    HAMA-885: Semi-Clustering is not producing expected output (Renil J via edwardyoon)
 
   IMPROVEMENTS
+   HAMA-921: Polish doSuperstep() function and VertexMessageIterable class (Anastasis Andronidis)
    HAMA-913: Add RPC implementation using netty(bsmin)  
    HAMA-914: Boolean flag (isCompressed) is required only when runtime compression is enabled (edwardyoon)
    HAMA-910: Web UI Improvement (Victor Lee via edwardyoon)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1630222&r1=1630221&r2=1630222&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Oct  8 20:03:34 2014
@@ -20,7 +20,6 @@ package org.apache.hama.graph;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -154,6 +153,8 @@ public final class GraphJobRunner<V exte
   /**
    * Just write <ID as Writable, Value as Writable> pair as a result. Note that
    * this will also be executed when failure happened.
+   * @param peer
+   * @throws java.io.IOException
    */
   @Override
   public final void cleanup(
@@ -203,7 +204,7 @@ public final class GraphJobRunner<V exte
       if (firstVertexMessage != null) {
         peer.send(peer.getPeerName(), firstVertexMessage);
       }
-      GraphJobMessage msg = null;
+      GraphJobMessage msg;
       while ((msg = peer.getCurrentMessage()) != null) {
         peer.send(peer.getPeerName(), msg);
       }
@@ -221,104 +222,68 @@ public final class GraphJobRunner<V exte
   /**
    * Do the main logic of a superstep, namely checking if vertices are active,
    * feeding compute with messages and controlling combiners/aggregators.
+   * We iterate over our messages and vertices in sorted order. That means
+   * that we need to seek the first vertex that has the same ID as the
+   * iterated message.
    */
-  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
     this.changedVertexCnt = 0;
-    vertices.startSuperstep();
+    this.vertices.startSuperstep();
 
-    /*
-     * We iterate over our messages and vertices in sorted order. That means
-     * that we need to seek the first vertex that has the same ID as the
-     * currentMessage or the first vertex that is active.
-     */
-    IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
-    VertexMessageIterable<V, M> iterable = null;
-    Vertex<V, E, M> vertex = null;
+    IDSkippingIterator<V, E, M> iterator = this.vertices.skippingIterator();
+    VertexMessages<V, M> queueMessages = new VertexMessages<V, M>(peer);
+    queueMessages.prependMessage(currentMessage);
 
     // note that can't skip inactive vertices because we have to rewrite the
     // complete vertex file in each iteration
-    while (iterator.hasNext(
-        currentMessage == null ? null : (V) currentMessage.getVertexId(),
-        Strategy.ALL)) {
-
-      vertex = iterator.next();
-      if (currentMessage != null) {
-        iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
-            vertex, peer);
-      } else {
-        iterable = null;
-      }
+    V firstVID = currentMessage == null ? null : (V) currentMessage.getVertexId();
+    while (iterator.hasNext(firstVID, Strategy.ALL)) {
+      Vertex<V, E, M> vertex = iterator.next();
+      boolean msgsExist = queueMessages.continueWith(vertex.getVertexID());
 
-      if (iterable != null && vertex.isHalted()) {
+      if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages);
+
+      if (msgsExist && vertex.isHalted()) {
         vertex.setActive();
       }
 
       if (!vertex.isHalted()) {
-        if (iterable == null) {
-          vertex.compute(Collections.<M> emptyList());
-        } else {
-          vertex.compute(iterable);
-          currentMessage = iterable.getOverflowMessage();
-        }
+        vertex.compute(queueMessages);
         activeVertices++;
       }
 
+      // Dump remaining messages
+      queueMessages.dumpRest();
+
       // note that we even need to rewrite the vertex if it is halted for
       // consistency reasons
-      vertices.finishVertexComputation(vertex);
+      this.vertices.finishVertexComputation(vertex);
     }
-    vertices.finishSuperstep();
+    this.vertices.finishSuperstep();
 
     getAggregationRunner().sendAggregatorValues(peer, activeVertices,
         this.changedVertexCnt);
-    iteration++;
+    this.iteration++;
   }
 
   /**
-   * Iterating utility that ensures following things: <br/>
-   * - if vertex is active, but the given message does not match the vertexID,
-   * return null. <br/>
-   * - if vertex is inactive, but received a message that matches the ID, build
-   * an iterator that can be iterated until the next vertex has been reached
-   * (not buffer in memory) and set the vertex active <br/>
-   * - if vertex is active, and the given message does match the vertexID,
-   * return an iterator that can be iterated until the next vertex has been
-   * reached. <br/>
-   * - if vertex is inactive, and received no message, return null.
+   * Utility that ensures that the incoming messages have a target vertex.
    */
-  @SuppressWarnings("unchecked")
-  private VertexMessageIterable<V, M> iterate(GraphJobMessage currentMessage,
-      V firstMessageId, Vertex<V, E, M> vertex,
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
-    int comparision = firstMessageId.compareTo(vertex.getVertexID());
-    if (conf.getBoolean("hama.check.missing.vertex", true)) {
-      if (comparision < 0) {
+  private void checkMsgOrder(V vid, VertexMessages<V, M> vm) {
+    // When the vid is greater than the current message, it means that a vertex
+    // has sent a message to an other vertex that doesn't exist
+    if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) {
+      if (conf.getBoolean("hama.check.missing.vertex", true)) {
         throw new IllegalArgumentException(
-          "A message has recieved with a destination ID: " + firstMessageId +
-          " that does not exist! (Vertex iterator is at" + vertex.getVertexID() 
-          + " ID)");
-      }
-    } else {
-      while (comparision < 0) {
-        VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(
-            currentMessage, firstMessageId, peer);
-        currentMessage = messageIterable.getOverflowMessage();
-        firstMessageId = (V) currentMessage.getVertexId();
-        comparision = firstMessageId.compareTo(vertex.getVertexID());
-      }
-    }
-    if (comparision == 0) {
-      // vertex id matches with the vertex, return an iterator with newest
-      // message
-      return new VertexMessageIterable<V, M>(currentMessage,
-          vertex.getVertexID(), peer);
-    } else {
-      // return null
-      return null;
+                "A message has recieved with a destination ID: " + vm.getMessageVID()
+                + " that does not exist! (Vertex iterator is at" + vid + " ID)");
+      } else {
+        // Skip all unrecognized messages until we find a match
+        vm.continueUntil(vid);
+      }
     }
   }
 
@@ -431,9 +396,9 @@ public final class GraphJobRunner<V exte
           }
         } else {
           if (vertex.compareTo(currentVertex) > 0) {
-            throw new IOException("The records of split aren't in order by vertex ID.");  
+            throw new IOException("The records of split aren't in order by vertex ID.");
           }
-          
+
           if (selfReference) {
             vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
           }

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java?rev=1630222&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java Wed Oct  8 20:03:34 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * This class has as a target to iterate the whole sorted queue of the incoming
+ * messages. Each vertex will be able to call the <code>hasNext()</code> and
+ * <code>next()</code> methods to consume the messages. The iterator is
+ * responsible to understand when the messages of a specific Vertex ID have been
+ * consumed, and then unlock the messages of the next Vertex ID through the
+ * <code>continueWith()</code> method.
+ *
+ * @param <V>
+ * @param <T>
+ */
+public class VertexMessages<V, T> implements Iterator<T>, Iterable<T> {
+  private final BSPPeer<?, ?, ?, ?, GraphJobMessage> peer;
+  private V vid = null;
+  private GraphJobMessage currentMessage = null;
+  private boolean locked = true;
+
+  public VertexMessages(BSPPeer<?, ?, ?, ?, GraphJobMessage> peer) {
+    this.peer = peer;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (locked) {
+      return false;
+    }
+
+    try {
+      if (this.currentMessage == null) {
+        this.currentMessage = this.peer.getCurrentMessage();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (this.currentMessage != null && this.currentMessage.getVertexId().equals(this.vid)) {
+      return true;
+    }
+    // When a new ID has shown up or the messages are finished,
+    // we lock the iterator
+    this.locked = true;
+    return false;
+  }
+
+  @Override
+  public T next() {
+    if (this.currentMessage == null || this.locked) {
+      return null;
+    }
+
+    // Despose the current message and prepare for next hasNext() call
+    try {
+      return (T) this.currentMessage.getVertexValue();
+    } finally {
+      this.currentMessage = null;
+    }
+  }
+
+  /**
+   * By implementing both <code>Iterator</code> and <code>Iterable</code>
+   * interfaces, this class will not be able to re-iterated and the messages
+   * will be accessed only once. In our case this is fine.
+   *
+   * @return an one-time iterator
+   */
+  @Override
+  public Iterator<T> iterator() {
+    return this;
+  }
+
+  /**
+   * This method should be used only after initialization. If an other message
+   * exists in the memory of the iterator, the new prepended message
+   * will be ignored.
+   *
+   * @param msg The message to be prepended just after initialization
+   */
+  public void prependMessage(GraphJobMessage msg) {
+    if (this.currentMessage == null && msg != null) {
+      this.currentMessage = msg;
+    }
+  }
+
+  /**
+   * Check the vertexID target of the current message that is loaded in the
+   * iterator and unlock the iterator only if the <code>vid</code> argument is
+   * matching.
+   *
+   * @param vid
+   * @return return true if the <code>vid</code> is equal to the next message's ID
+   */
+  public boolean continueWith(V vid) {
+    // Normally when we call this method this.locked == true
+    this.vid = vid;
+    this.locked = false;
+
+    if (this.currentMessage == null) {
+      // Get next message (if there is) and decide based on the new vid
+      return this.hasNext();
+    }
+
+    // If we have a message already loaded
+    if (!this.currentMessage.getVertexId().equals(vid)) {
+      this.locked = true;
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Consume the incoming messages until we find a message that has a target
+   * equal to the <code>vid</code> argument.
+   *
+   * @param vid
+   * @return
+   */
+  public boolean continueUntil(V vid) {
+    do {
+      try {
+        if (this.currentMessage == null) {
+          this.currentMessage = this.peer.getCurrentMessage();
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      if (this.currentMessage == null) {
+        this.locked = true;
+        return false;
+      }
+    } while (!this.currentMessage.getVertexId().equals(this.vid));
+
+    this.locked = false;
+    return true;
+  }
+
+  public void dumpRest() {
+    while(this.hasNext()) {
+      this.next();
+    }
+  }
+
+  /**
+   * Return the target Vertex ID of the current loaded message.
+   *
+   * @return
+   */
+  public V getMessageVID() {
+    return this.currentMessage == null ? null : (V) this.currentMessage.getVertexId();
+  }
+}