You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/12/23 05:56:29 UTC

svn commit: r1553069 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Author: edwardyoon
Date: Mon Dec 23 04:56:29 2013
New Revision: 1553069

URL: http://svn.apache.org/r1553069
Log:
HAMA-831: Support for multi records with same vertexID (edwardyoon)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Dec 23 04:56:29 2013
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-831: Support for multi records with same vertexID (edwardyoon)
    HAMA-830: KMeans and NeuralNetwork doesn't load config file (edwardyoon)
    HAMA-812: In local mode BSPJobClient.close throws Exception (Martin Illecker)
    HAMA-821: Fix bugs in KMeans example and make output more readable (edwardyoon)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Dec 23 04:56:29 2013
@@ -71,11 +71,16 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
   public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
-  public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE);
-  public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE);
-  public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER);
-  public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES);
-  public static final Text FLAG_AGGREGATOR_SKIP = new Text(S_FLAG_AGGREGATOR_SKIP);
+  public static final Text FLAG_VERTEX_INCREASE = new Text(
+      S_FLAG_VERTEX_INCREASE);
+  public static final Text FLAG_VERTEX_DECREASE = new Text(
+      S_FLAG_VERTEX_DECREASE);
+  public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(
+      S_FLAG_VERTEX_ALTER_COUNTER);
+  public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
+      S_FLAG_VERTEX_TOTAL_VERTICES);
+  public static final Text FLAG_AGGREGATOR_SKIP = new Text(
+      S_FLAG_AGGREGATOR_SKIP);
 
   public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -180,18 +185,24 @@ public final class GraphJobRunner<V exte
 
     if (isMasterTask(peer) && iteration == 1) {
       MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+      updatedCnt.put(
+          FLAG_VERTEX_TOTAL_VERTICES,
+          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+              .getCounter())));
       // send the updates from the master tasks back to the slaves
       for (String peerName : peer.getAllPeerNames()) {
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
-    
+
     // this is only done in every second iteration
     if (isMasterTask(peer) && iteration > 1) {
       MapWritable updatedCnt = new MapWritable();
-      // send total number of vertices.      
-      updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+      // send total number of vertices.
+      updatedCnt.put(
+          FLAG_VERTEX_TOTAL_VERTICES,
+          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+              .getCounter())));
       // exit if there's no update made
       if (globalUpdateCounts == 0) {
         updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
@@ -217,8 +228,8 @@ public final class GraphJobRunner<V exte
       // now sync
       peer.sync();
       // now the map message must be read that might be send from the master
-      updated = getAggregationRunner().receiveAggregatedValues(peer
-          .getCurrentMessage().getMap(), iteration);
+      updated = getAggregationRunner().receiveAggregatedValues(
+          peer.getCurrentMessage().getMap(), iteration);
       // set the first vertex message back to the message it had before sync
       firstVertexMessage = peer.getCurrentMessage();
     }
@@ -281,7 +292,8 @@ public final class GraphJobRunner<V exte
     }
     vertices.finishSuperstep();
 
-    getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
+    getAggregationRunner().sendAggregatorValues(peer, activeVertices,
+        this.changedVertexCnt);
     iteration++;
   }
 
@@ -305,15 +317,15 @@ public final class GraphJobRunner<V exte
     if (conf.getBoolean("hama.check.missing.vertex", true)) {
       if (comparision < 0) {
         throw new IllegalArgumentException(
-          "Messages must never be behind the vertex in ID! Current Message ID: "
-              + firstMessageId + " vs. " + vertex.getVertexID());
-      } 
+            "Messages must never be behind the vertex in ID! Current Message ID: "
+                + firstMessageId + " vs. " + vertex.getVertexID());
+      }
     } else {
       while (comparision < 0) {
-        VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(currentMessage,
-          firstMessageId, peer);
+        VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(
+            currentMessage, firstMessageId, peer);
         currentMessage = messageIterable.getOverflowMessage();
-        firstMessageId = (V)currentMessage.getVertexId();
+        firstMessageId = (V) currentMessage.getVertexId();
         comparision = firstMessageId.compareTo(vertex.getVertexID());
       }
     }
@@ -383,7 +395,9 @@ public final class GraphJobRunner<V exte
     setAggregationRunner(new AggregationRunner<V, E, M>());
     getAggregationRunner().setupAggregators(peer);
 
-    Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends VerticesInfo<V, E, M>>) conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class, VerticesInfo.class);
+    Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends VerticesInfo<V, E, M>>) conf
+        .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+            VerticesInfo.class);
     vertices = ReflectionUtils.newInstance(verticesInfoClass);
     vertices.init(this, conf, peer.getTaskId());
   }
@@ -424,25 +438,44 @@ public final class GraphJobRunner<V exte
     // our VertexInputReader ensures incoming vertices are sorted by their ID
     Vertex<V, E, M> vertex = GraphJobRunner
         .<V, E, M> newVertexInstance(VERTEX_CLASS);
+    Vertex<V, E, M> currentVertex = GraphJobRunner
+        .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
     KeyValuePair<Writable, Writable> record = null;
     KeyValuePair<Writable, Writable> converted = null;
+
     while ((record = peer.readNext()) != null) {
       converted = converter.convertRecord(record, conf);
-      vertex = (Vertex<V, E, M>) converted.getKey();
-      vertex.setRunner(this);
-      vertex.setup(conf);
+      currentVertex = (Vertex<V, E, M>) converted.getKey();
 
-      if (selfReference) {
-        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
-      }
+      if (vertex.getVertexID() == null) {
+        vertex = currentVertex;
+      } else {
+        if (vertex.getVertexID().equals(currentVertex.getVertexID())) {
+          for (Edge<V, E> edge : currentVertex.getEdges()) {
+            vertex.addEdge(edge);
+          }
+        } else {
+          vertex.setRunner(this);
+          vertex.setup(conf);
 
-      vertices.addVertex(vertex);
+          if (selfReference) {
+            vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+          }
 
-      // Reinitializing vertex object for memory based implementations of
-      // VerticesInfo
-      vertex = GraphJobRunner.<V, E, M> newVertexInstance(VERTEX_CLASS);
-      vertex.setRunner(this);
+          vertices.addVertex(vertex);
+          vertex = currentVertex;
+        }
+      }
     }
+    // add last vertex.
+    vertex.setRunner(this);
+    vertex.setup(conf);
+    if (selfReference) {
+      vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+    }
+    vertices.addVertex(vertex);
+
     vertices.finishAdditions();
     // finish the "superstep" because we have written a new file here
     vertices.finishSuperstep();
@@ -454,32 +487,37 @@ public final class GraphJobRunner<V exte
 
   /**
    * Add new vertex into memory of each peer.
-   * @throws IOException 
+   * 
+   * @throws IOException
    */
   private void addVertex(Vertex<V, E, M> vertex) throws IOException {
     vertex.setRunner(this);
     vertex.setup(conf);
-          
+
     if (conf.getBoolean("hama.graph.self.ref", false)) {
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }
-    
-    LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " + peer.getPeerName());
+
+    LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
+        + peer.getPeerName());
     vertices.addVertex(vertex);
   }
 
   /**
    * Remove vertex from this peer.
-   * @throws IOException 
+   * 
+   * @throws IOException
    */
   private void removeVertex(V vertexID) {
     vertices.removeVertex(vertexID);
-    LOG.debug("Removed VertexID: " + vertexID + " in peer " + peer.getPeerName());
+    LOG.debug("Removed VertexID: " + vertexID + " in peer "
+        + peer.getPeerName());
   }
 
   /**
    * After all inserts are done, we must finalize the VertexInfo data structure.
-   * @throws IOException 
+   * 
+   * @throws IOException
    */
   private void finishAdditions() throws IOException {
     vertices.finishAdditions();
@@ -489,7 +527,8 @@ public final class GraphJobRunner<V exte
 
   /**
    * After all inserts are done, we must finalize the VertexInfo data structure.
-   * @throws IOException 
+   * 
+   * @throws IOException
    */
   private void finishRemovals() throws IOException {
     vertices.finishRemovals();
@@ -534,8 +573,8 @@ public final class GraphJobRunner<V exte
       throws IOException, SyncException, InterruptedException {
     GraphJobMessage msg = null;
     boolean dynamicAdditions = false;
-    boolean dynamicRemovals  = false;
-    
+    boolean dynamicRemovals = false;
+
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
@@ -559,8 +598,8 @@ public final class GraphJobRunner<V exte
                 (M) e.getValue());
           } else if (getAggregationRunner().isEnabled()
               && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
-            getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID,
-                (M) e.getValue());
+            getAggregationRunner().masterReadAggregatedIncrementalValue(
+                vertexID, (M) e.getValue());
           } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
             dynamicAdditions = true;
             addVertex((Vertex<V, E, M>) e.getValue());
@@ -571,15 +610,20 @@ public final class GraphJobRunner<V exte
             this.numberVertices = ((LongWritable) e.getValue()).get();
           } else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) {
             if (isMasterTask(peer)) {
-              peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) e.getValue()).get());
+              peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(
+                  ((LongWritable) e.getValue()).get());
             } else {
-              throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer);
+              throw new UnsupportedOperationException(
+                  "A message to increase vertex count is in a wrong place: "
+                      + peer);
             }
           } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
             if (isMasterTask(peer)) {
-              this.getAggregationRunner().addSkipAggregator(((IntWritable) e.getValue()).get());
+              this.getAggregationRunner().addSkipAggregator(
+                  ((IntWritable) e.getValue()).get());
             } else {
-              throw new UnsupportedOperationException("A message to skip aggregators is in a wrong peer: " + peer);
+              throw new UnsupportedOperationException(
+                  "A message to skip aggregators is in a wrong peer: " + peer);
             }
           }
         }
@@ -590,14 +634,15 @@ public final class GraphJobRunner<V exte
 
     }
 
-    // If we applied any changes to vertices, we need to call finishAdditions and finishRemovals in the end.
+    // If we applied any changes to vertices, we need to call finishAdditions
+    // and finishRemovals in the end.
     if (dynamicAdditions) {
       finishAdditions();
     }
     if (dynamicRemovals) {
       finishRemovals();
     }
-    
+
     return msg;
   }
 

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Dec 23 04:56:29 2013
@@ -44,7 +44,8 @@ import org.junit.Before;
 public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
 
   String[] input = new String[] { "stackoverflow.com\tyahoo.com",
-      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+      "facebook.com\ttwitter.com", 
+      "facebook.com\tgoogle.com\tnasa.gov",
       "yahoo.com\tnasa.gov\tstackoverflow.com",
       "twitter.com\tgoogle.com\tfacebook.com",
       "nasa.gov\tyahoo.com\tstackoverflow.com",
@@ -52,11 +53,11 @@ public class TestSubmitGraphJob extends 
 
   private static String INPUT = "/tmp/pagerank/real-tmp.seq";
   private static String OUTPUT = "/tmp/pagerank/real-out";
-
+  @SuppressWarnings("rawtypes")
   private static final List<Class<? extends VerticesInfo>> vi = new ArrayList<Class<? extends VerticesInfo>>();
 
   @Before
-  public void setUp() throws Exception  {
+  public void setUp() throws Exception {
     super.setUp();
     vi.add(ListVerticesInfo.class);
     vi.add(DiskVerticesInfo.class);
@@ -113,9 +114,11 @@ public class TestSubmitGraphJob extends 
     }
   }
 
+  @SuppressWarnings("rawtypes")
   protected void injectVerticesInfo() {
-    Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3));
-    LOG.info("using vertices info of type : "+verticesInfoClass.getName());
+    Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
+        .abs(new Random().nextInt() % 3));
+    LOG.info("using vertices info of type : " + verticesInfoClass.getName());
   }
 
   private void verifyResult() throws IOException {

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Mon Dec 23 04:56:29 2013
@@ -43,7 +43,7 @@ public class PageRank {
 
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
-
+    
     static double DAMPING_FACTOR = 0.85;
     static double MAXIMUM_CONVERGENCE_ERROR = 0.001;