You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/17 12:16:40 UTC
svn commit: r1374192 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Author: apresta
Date: Fri Aug 17 10:16:40 2012
New Revision: 1374192
URL: http://svn.apache.org/viewvc?rev=1374192&view=rev
Log:
GIRAPH-302: Thread safety issue with sending partitions around. (aching via apresta)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1374192&r1=1374191&r2=1374192&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Aug 17 10:16:40 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-302: Thread safety issue with sending partitions around.
+ (aching via apresta)
+
GIRAPH-303: Regression: cleanup phase happens earlier than it
should. (majakabiljo via apresta)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1374192&r1=1374191&r2=1374192&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Aug 17 10:16:40 2012
@@ -311,11 +311,10 @@ public class BspServiceWorker<I extends
if (!entry.getValue().getVertices().isEmpty()) {
commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
entry.getValue());
- entry.getValue().getVertices().clear();
}
}
- commService.flush();
inputSplitCache.clear();
+ commService.flush();
return vertexEdgeCount;
}
@@ -470,7 +469,7 @@ public class BspServiceWorker<I extends
if (transferRegulator.transferThisPartition(partitionOwner)) {
commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
partition);
- partition.getVertices().clear();
+ inputSplitCache.remove(partitionOwner);
}
++totalVerticesLoaded;
totalEdgesLoaded += readerVertex.getNumEdges();
@@ -960,6 +959,12 @@ public class BspServiceWorker<I extends
// of this worker
// 5. Let the master know it is finished.
// 6. Wait for the master's global stats, and check if done
+
+ getContext().setStatus("Flushing started: " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
long workerSentMessages = 0;
try {
commService.flush();
@@ -975,7 +980,8 @@ public class BspServiceWorker<I extends
}
if (LOG.isInfoEnabled()) {
- LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
+ LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+ " , mesages = " + workerSentMessages + " " +
MemoryUtils.getRuntimeMemoryStats());
}