You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/17 12:16:40 UTC

svn commit: r1374192 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Author: apresta
Date: Fri Aug 17 10:16:40 2012
New Revision: 1374192

URL: http://svn.apache.org/viewvc?rev=1374192&view=rev
Log:
GIRAPH-302: Thread safety issue with sending partitions around. (aching via apresta)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1374192&r1=1374191&r2=1374192&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Aug 17 10:16:40 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-302: Thread safety issue with sending partitions around.
+  (aching via apresta)
+
   GIRAPH-303: Regression: cleanup phase happens earlier than it
   should. (majakabiljo via apresta)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1374192&r1=1374191&r2=1374192&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Aug 17 10:16:40 2012
@@ -311,11 +311,10 @@ public class BspServiceWorker<I extends 
       if (!entry.getValue().getVertices().isEmpty()) {
         commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
             entry.getValue());
-        entry.getValue().getVertices().clear();
       }
     }
-    commService.flush();
     inputSplitCache.clear();
+    commService.flush();
 
     return vertexEdgeCount;
   }
@@ -470,7 +469,7 @@ public class BspServiceWorker<I extends 
       if (transferRegulator.transferThisPartition(partitionOwner)) {
         commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
             partition);
-        partition.getVertices().clear();
+        inputSplitCache.remove(partitionOwner);
       }
       ++totalVerticesLoaded;
       totalEdgesLoaded += readerVertex.getNumEdges();
@@ -960,6 +959,12 @@ public class BspServiceWorker<I extends 
     //    of this worker
     // 5. Let the master know it is finished.
     // 6. Wait for the master's global stats, and check if done
+
+    getContext().setStatus("Flushing started: " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+
     long workerSentMessages = 0;
     try {
       commService.flush();
@@ -975,7 +980,8 @@ public class BspServiceWorker<I extends 
     }
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
+      LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+          " , mesages = " + workerSentMessages + " " +
           MemoryUtils.getRuntimeMemoryStats());
     }