You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 23:52:55 UTC

svn commit: r1202455 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/graph/partition/

Author: aching
Date: Tue Nov 15 22:52:54 2011
New Revision: 1202455

URL: http://svn.apache.org/viewvc?rev=1202455&view=rev
Log:
GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching)


Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Tue Nov 15 22:52:54 2011
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching)
+
   GIRAPH-70: Misspellings in PseudoRandomVertexInputFormat configuration
   parameters. (attilacsordas via jghoman)
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 15 22:52:54 2011
@@ -106,11 +106,9 @@ public interface CentralizedServiceWorke
      * worker level statistics after the computation.
      *
      * @param partitionStatsList All the partition stats for this worker
-     * @param workersSentMessages Number of messages sent on this worker
      * @return true if this is the last superstep, false otherwise
      */
-    boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                            long workersSentMessages);
+    boolean finishSuperstep(List<PartitionStats> partitionStatsList);
     /**
      * Get the partition that a vertex index would belong to
      *

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 15 22:52:54 2011
@@ -548,7 +548,7 @@ public class BspServiceWorker<
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, workerPartitionMap);
 
-        finishSuperstep(partitionStatsList, 0);
+        finishSuperstep(partitionStatsList);
     }
 
     /**
@@ -773,8 +773,7 @@ public class BspServiceWorker<
     }
 
     @Override
-    public boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                                   long workersSentMessages) {
+    public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
         // This barrier blocks until success (or the master signals it to
         // restart).
         //
@@ -785,8 +784,9 @@ public class BspServiceWorker<
         // of this worker
         // 3. Let the master know it is finished.
         // 4. Then it waits for the master to say whether to stop or not.
+        long workerSentMessages = 0;
         try {
-            commService.flush(getContext());
+            workerSentMessages = commService.flush(getContext());
         } catch (IOException e) {
             throw new IllegalStateException(
                 "finishSuperstep: flush failed", e);
@@ -807,7 +807,7 @@ public class BspServiceWorker<
             workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
                                       Base64.encodeBytes(partitionStatsBytes));
             workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
-                                      workersSentMessages);
+                                      workerSentMessages);
         } catch (JSONException e) {
             throw new RuntimeException(e);
         }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Nov 15 22:52:54 2011
@@ -512,7 +512,6 @@ public class GraphMapper<I extends Writa
 
         List<PartitionStats> partitionStatsList =
             new ArrayList<PartitionStats>();
-        long workerSentMessages = 0;
         do {
             long superstep = serviceWorker.getSuperstep();
 
@@ -556,7 +555,6 @@ public class GraphMapper<I extends Writa
             context.progress();
 
             partitionStatsList.clear();
-            workerSentMessages = 0;
             for (Partition<I, V, E, M> partition :
                     serviceWorker.getPartitionMap().values()) {
                 PartitionStats partitionStats =
@@ -593,8 +591,7 @@ public class GraphMapper<I extends Writa
                          " maxMem=" + Runtime.getRuntime().maxMemory() +
                          " freeMem=" + Runtime.getRuntime().freeMemory());
             }
-        } while (!serviceWorker.finishSuperstep(partitionStatsList,
-                                                workerSentMessages));
+        } while (!serviceWorker.finishSuperstep(partitionStatsList));
         if (LOG.isInfoEnabled()) {
             LOG.info("map: BSP application done " +
                      "(global vertices marked done)");

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Tue Nov 15 22:52:54 2011
@@ -32,8 +32,8 @@ import org.apache.hadoop.io.WritableComp
  * range partitioning is more susceptible to hot spots if the keys
  * are not randomly distributed.  Another negative is the user must implement
  * some of the functionality around how to split the key range.
- * 
- * Note:  This implementation is incomplete, the developer must implement the 
+ *
+ * Note:  This implementation is incomplete, the developer must implement the
  * various methods based on their index type.
  *
  * @param <I> Vertex index value
@@ -46,7 +46,7 @@ public abstract class RangeWorkerPartiti
         V extends Writable, E extends Writable, M extends Writable> implements
         WorkerGraphPartitioner<I, V, E, M> {
     /** Mapping of the vertex ids to the {@link PartitionOwner} */
-    private NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+    protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
         new TreeMap<I, RangePartitionOwner<I>>();
 
     @Override