You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/09/16 22:56:44 UTC

svn commit: r1171776 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: aching
Date: Fri Sep 16 20:56:44 2011
New Revision: 1171776

URL: http://svn.apache.org/viewvc?rev=1171776&view=rev
Log:
GIRAPH-34: Failure of Vertex reflection for putVertexList from
GIRAPH-27. (aching)


Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Fri Sep 16 20:56:44 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-34: Failure of Vertex reflection for putVertexList from
+  GIRAPH-27. (aching)
+
   GIRAPH-35: Modifying the site to indicate that Jake Mannix and
   Dmitriy Ryaboy are now Giraph committers. (aching)
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Fri Sep 16 20:56:44 2011
@@ -37,13 +37,13 @@ import java.util.TreeSet;
 import org.apache.log4j.Logger;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexRange;
 import org.apache.giraph.graph.VertexResolver;
@@ -423,6 +423,15 @@ public abstract class BasicRPCCommunicat
         InetSocketAddress addr,
         int numHandlers, String jobId, J jobToken) throws IOException;
 
+    /**
+     * Only constructor.
+     *
+     * @param context Context for getting configuration
+     * @param service Service worker to get the vertex ranges
+     * @throws IOException
+     * @throws UnknownHostException
+     * @throws InterruptedException
+     */
     public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
                                   CentralizedServiceWorker<I, V, E, M> service)
             throws IOException, UnknownHostException, InterruptedException {
@@ -804,8 +813,8 @@ end[HADOOP_FACEBOOK]*/
     public final void sendMessageReq(I destVertex, M msg) {
         InetSocketAddress addr = getInetSocketAddress(destVertex);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("sendMessage: Send bytes (" + msg.toString() + ") to " +
-                      destVertex + " with address " + addr);
+            LOG.debug("sendMessage: Send bytes (" + msg.toString() +
+                      ") to " + destVertex + " with address " + addr);
         }
         ++totalMsgsSentInSuperstep;
         Map<I, MsgList<M>> msgMap = null;
@@ -825,8 +834,10 @@ end[HADOOP_FACEBOOK]*/
                 msgMap.put(destVertex, msgList);
             }
             msgList.add(msg);
-            LOG.debug("sendMessage: added msg=" + msg + ", size=" +
-                      msgList.size());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sendMessage: added msg=" + msg + ", size=" +
+                          msgList.size());
+            }
             if (msgList.size() > maxSize) {
                 peerThreads.get(addr).flushLargeMsgList(destVertex);
             }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Fri Sep 16 20:56:44 2011
@@ -124,12 +124,13 @@ public class SimpleCheckpointVertex exte
             System.out.println("compute: vertex " + getVertexId() +
                                " sending edgeValue " + edgeValue +
                                " vertexValue " + vertexValue +
-                               " total " + (edgeValue.get() + (float) vertexValue) +
+                               " total " + (edgeValue.get() +
+                               (float) vertexValue) +
                                " to vertex " + targetVertexId +
                                " on superstep " + getSuperstep());
             edgeValue.set(edgeValue.get() + (float) vertexValue);
             addEdge(targetVertexId, edgeValue);
-            sendMsg(targetVertexId, edgeValue);
+            sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
         }
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Fri Sep 16 20:56:44 2011
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -37,9 +39,11 @@ import java.util.List;
 @SuppressWarnings("rawtypes")
 public abstract class BasicVertex<I extends WritableComparable,
         V extends Writable, E extends Writable, M extends Writable>
-        implements AggregatorUsage, Iterable<I> {
+        implements AggregatorUsage, Iterable<I>, Configurable {
     /** Global graph state **/
     private GraphState<I,V,E,M> graphState;
+    /** Configuration */
+    private Configuration conf;
 
     /**
      * Optionally defined by the user to be executed once on all workers
@@ -160,10 +164,12 @@ public abstract class BasicVertex<I exte
     public abstract int getNumOutEdges();
 
     /**
-     * Send a message to a vertex id.
+     * Send a message to a vertex id.  The message should not be mutated after
+     * this method returns or else undefined results could occur.
      *
-     * @param id vertex id to send the message to
-     * @param msg message data to send
+     * @param id Vertex id to send the message to
+     * @param msg Message data to send.  Note that after the message is sent,
+     *        the user should not modify the object.
      */
     public void sendMsg(I id, M msg) {
         if (msg == null) {
@@ -182,8 +188,8 @@ public abstract class BasicVertex<I exte
     /**
      * After this is called, the compute() code will no longer be called for
      * this vertice unless a message is sent to it.  Then the compute() code
-     * will be called once again until this function is called.  The application
-     * finishes only when all vertices vote to halt.
+     * will be called once again until this function is called.  The
+     * application finishes only when all vertices vote to halt.
      */
     public abstract void voteToHalt();
 
@@ -245,4 +251,14 @@ public abstract class BasicVertex<I exte
         return getGraphState().getGraphMapper().getAggregatorUsage().
             useAggregator(name);
     }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Fri Sep 16 20:56:44 2011
@@ -145,7 +145,7 @@ public abstract class Vertex<I extends W
     @Override
     public E removeEdge(I targetVertexId) {
         Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
-        if(edge != null) {
+        if (edge != null) {
             return edge.getEdgeValue();
         } else {
             return null;
@@ -175,26 +175,23 @@ public abstract class Vertex<I extends W
 
     @Override
     final public void readFields(DataInput in) throws IOException {
-        vertexId =
-            BspUtils.<I>createVertexIndex(getContext().getConfiguration());
+        vertexId = BspUtils.<I>createVertexIndex(getConf());
         vertexId.readFields(in);
         boolean hasVertexValue = in.readBoolean();
         if (hasVertexValue) {
-            vertexValue =
-                BspUtils.<V>createVertexValue(getContext().getConfiguration());
+            vertexValue = BspUtils.<V>createVertexValue(getConf());
             vertexValue.readFields(in);
         }
         long edgeMapSize = in.readLong();
         for (long i = 0; i < edgeMapSize; ++i) {
             Edge<I, E> edge = new Edge<I, E>();
-            edge.setConf(getContext().getConfiguration());
+            edge.setConf(getConf());
             edge.readFields(in);
             addEdge(edge.getDestVertexId(), edge.getEdgeValue());
         }
         long msgListSize = in.readLong();
         for (long i = 0; i < msgListSize; ++i) {
-            M msg =
-                BspUtils.<M>createMessageValue(getContext().getConfiguration());
+            M msg = BspUtils.<M>createMessageValue(getConf());
             msg.readFields(in);
             msgList.add(msg);
         }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Fri Sep 16 20:56:44 2011
@@ -71,7 +71,7 @@ public class TestVertexRangeBalancer ext
         removeAndSetOutput(job, outputPath);
         assertTrue(job.run(true));
         FileSystem hdfs = FileSystem.get(job.getConfiguration());
-        final int correctLen = 118;
+        final int correctLen = 123;
         if (getJobTracker() != null) {
             FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
             int totalLen = 0;